#Imports

In [31]:
import random
import cv2
import numpy as np
from PIL import Image
from time import sleep
from tqdm import tqdm
from collections import deque
from keras.models import Sequential, load_model
from keras.layers import Dense
import sys
import matplotlib.pyplot as plt
import numpy as np
from statistics import mean

#Tetris Enviornment Credit to Nuno-Faria, Changed to fit Stanford Model

In [32]:
class Tetris:

    #Determines tetris board size
    MAP_EMPTY = 0
    MAP_BLOCK = 1
    MAP_PLAYER = 2
    BOARD_WIDTH = 10
    BOARD_HEIGHT = 20

    #Tetris pieces
    TETROMINOS = {
        0: { 0: [(0,0), (1,0), (2,0), (3,0)],
             90: [(1,0), (1,1), (1,2), (1,3)],
             180: [(3,0), (2,0), (1,0), (0,0)],
             270: [(1,3), (1,2), (1,1), (1,0)] },
        1: { 0: [(1,0), (0,1), (1,1), (2,1)],
             90: [(0,1), (1,2), (1,1), (1,0)],
             180: [(1,2), (2,1), (1,1), (0,1)],
             270: [(2,1), (1,0), (1,1), (1,2)] },
        2: { 0: [(1,0), (1,1), (1,2), (2,2)],
             90: [(0,1), (1,1), (2,1), (2,0)],
             180: [(1,2), (1,1), (1,0), (0,0)],
             270: [(2,1), (1,1), (0,1), (0,2)] },
        3: { 0: [(1,0), (1,1), (1,2), (0,2)],
             90: [(0,1), (1,1), (2,1), (2,2)],
             180: [(1,2), (1,1), (1,0), (2,0)],
             270: [(2,1), (1,1), (0,1), (0,0)] },
        4: { 0: [(0,0), (1,0), (1,1), (2,1)],
             90: [(0,2), (0,1), (1,1), (1,0)],
             180: [(2,1), (1,1), (1,0), (0,0)],
             270: [(1,0), (1,1), (0,1), (0,2)] },
        5: { 0: [(2,0), (1,0), (1,1), (0,1)],
             90: [(0,0), (0,1), (1,1), (1,2)],
             180: [(0,1), (1,1), (1,0), (2,0)],
             270: [(1,2), (1,1), (0,1), (0,0)] },
        6: { 0: [(1,0), (2,0), (1,1), (2,1)],
             90: [(1,0), (2,0), (1,1), (2,1)],
             180: [(1,0), (2,0), (1,1), (2,1)],
             270: [(1,0), (2,0), (1,1), (2,1)] }
    }

  #Colors, (0 Is background, 1 is placed, 2 is active)
    COLORS = {
        0: (255, 255, 255),
        1: (247, 64, 99),
        2: (0, 167, 247),
    }

  #Starts a new game
    def __init__(self):
        self.reset()


    #Resets game state and board
    def reset(self):
        self.board = [[0] * Tetris.BOARD_WIDTH for _ in range(Tetris.BOARD_HEIGHT)]
        self.game_over = False
        self.bag = list(range(len(Tetris.TETROMINOS)))
        random.shuffle(self.bag)
        self.next_piece = self.bag.pop()
        self._new_round()
        self.score = 0
        return self._get_board_props(self.board)

    def _new_round(self):
      #Gets new piece for next round
        if len(self.bag) == 0:
            self.bag = list(range(len(Tetris.TETROMINOS)))
            random.shuffle(self.bag)
        self.current_piece = self.next_piece
        self.next_piece = self.bag.pop()
        self.current_pos = [3, 0]
        self.current_rotation = 0
        if self._check_collision(self._get_rotated_piece(), self.current_pos):
            self.game_over = True

  #Returns active pieces location
    def _get_rotated_piece(self):
        return Tetris.TETROMINOS[self.current_piece][self.current_rotation]

    #Gets whole board
    def _get_complete_board(self):
        piece = self._get_rotated_piece()
        piece = [np.add(x, self.current_pos) for x in piece]
        board = [x[:] for x in self.board]
        for x, y in piece:
            board[y][x] = Tetris.MAP_PLAYER
        return board

  #Prevents illegal moves in grouping actions
    def _check_collision(self, piece, pos):
        for x, y in piece:
            x += pos[0]
            y += pos[1]
            if x < 0 or x >= Tetris.BOARD_WIDTH or y < 0 or y >= Tetris.BOARD_HEIGHT or self.board[y][x] == Tetris.MAP_BLOCK:
                return True
        return False

    #Places new piece at location
    def _add_piece_to_board(self, piece, pos):
        board = [x[:] for x in self.board]
        for x, y in piece:
            board[y + pos[1]][x + pos[0]] = Tetris.MAP_BLOCK
        return board

   #Removes cleared lines
    def _clear_lines(self, board):
        lines_to_clear = [index for index, row in enumerate(board) if sum(row) == Tetris.BOARD_WIDTH]
        if lines_to_clear:
            board = [row for index, row in enumerate(board) if index not in lines_to_clear]
            for _ in lines_to_clear:
                board.insert(0, [0 for _ in range(Tetris.BOARD_WIDTH)])
        return len(lines_to_clear), board

    #Returns number of holes (empty cells with a block above)
    def _number_of_holes(self, board):
        holes = 0
        for col in zip(*board):
            i = 0
            while i < Tetris.BOARD_HEIGHT and col[i] != Tetris.MAP_BLOCK:
                i += 1
            holes += len([x for x in col[i+1:] if x == Tetris.MAP_EMPTY])
        return holes

    #Calcultes bumpiness (difference in height in columns next to eachother)
    def _bumpiness(self, board):
        total_bumpiness = 0
        min_ys = []
        for col in zip(*board):
            i = 0
            while i < Tetris.BOARD_HEIGHT and col[i] != Tetris.MAP_BLOCK:
                i += 1
            min_ys.append(i)
        for i in range(len(min_ys) - 1):
            total_bumpiness += abs(min_ys[i] - min_ys[i+1])
        return total_bumpiness, max(abs(min_ys[i] - min_ys[i+1]) for i in range(len(min_ys) - 1))

    #Reutrns hight or board, sum of heights, and max height
    def _height(self, board):
        sum_height = 0
        max_height = 0
        for col in zip(*board):
            i = 0
            while i < Tetris.BOARD_HEIGHT and col[i] == Tetris.MAP_EMPTY:
                i += 1
            height = Tetris.BOARD_HEIGHT - i
            sum_height += height
            max_height = max(max_height, height)
        return sum_height, max_height, Tetris.BOARD_HEIGHT

  #Gets features for rewards
    def _get_board_props(self, board):
        lines, board = self._clear_lines(board)
        holes = self._number_of_holes(board)
        total_bumpiness, _ = self._bumpiness(board)
        sum_height, _, _ = self._height(board)
        return [lines, holes, total_bumpiness, sum_height]

  #Returns legal moves and board features from that
    def get_next_states(self):
        states = {}
        piece_id = self.current_piece
        rotations = [0] if piece_id == 6 else [0, 90] if piece_id == 0 else [0, 90, 180, 270]
        for rotation in rotations:
            piece = Tetris.TETROMINOS[piece_id][rotation]
            min_x = min([p[0] for p in piece])
            max_x = max([p[0] for p in piece])
            for x in range(-min_x, Tetris.BOARD_WIDTH - max_x):
                pos = [x, 0]
                while not self._check_collision(piece, pos):
                    pos[1] += 1
                pos[1] -= 1
                if pos[1] >= 0:
                    board = self._add_piece_to_board(piece, pos)
                    states[(x, rotation)] = self._get_board_props(board)
        return states

    def get_game_score(self):
        return self.score

  #Returns number of features
    def get_state_size(self):
        return 4

#Plays a round of game
    def play(self, x, rotation, render=False, render_delay=None):
      #Thinks of an an action and checks to make sure it is legal
        self.current_pos = [x, 0]
        self.current_rotation = rotation
        while not self._check_collision(self._get_rotated_piece(), self.current_pos):
            if render:
                self.render()
                if render_delay:
                    sleep(render_delay)
            self.current_pos[1] += 1
        self.current_pos[1] -= 1

        #Plays the action
        self.board = self._add_piece_to_board(self._get_rotated_piece(), self.current_pos)
        lines_cleared, self.board = self._clear_lines(self.board)

        #Stanford equation
        holes = self._number_of_holes(self.board)
        total_bumpiness, _ = self._bumpiness(self.board)
        sum_height, _, _ = self._height(self.board)

        score = 1 + (lines_cleared ** 2) * Tetris.BOARD_WIDTH
        self.score += score
        self._new_round()


        if self.game_over:
            score -= 2
        return score, self.game_over

#Makes the board into a image
    def render(self):
        img = [Tetris.COLORS[p] for row in self._get_complete_board() for p in row]
        img = np.array(img).reshape(Tetris.BOARD_HEIGHT, Tetris.BOARD_WIDTH, 3).astype(np.uint8)
        img = img[..., ::-1]
        img = Image.fromarray(img, 'RGB')
        img = img.resize((Tetris.BOARD_WIDTH * 25, Tetris.BOARD_HEIGHT * 25), Image.NEAREST)
        img = np.array(img)
        cv2.putText(img, str(self.score), (22, 22), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1)
        cv2.imshow('image', img)
        cv2.waitKey(1)

#DQN Agent

In [33]:
import numpy as np
import random
from collections import deque
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense

class DQNAgent:
    def __init__(self, state_size, mem_size=10000, discount=0.99, epsilon=1.0, epsilon_min=0.0,
                 epsilon_stop_episode=0, n_neurons=[32, 32, 32], activations=['relu', 'relu', 'relu' 'linear'],
                 loss='mse', optimizer='adam', replay_start_size=None):

        #Size of input vector
        self.state_size = state_size

        #Discount (how much the future vs present Q value matters)
        self.discount = discount

        #Epsilon (odds of random move (1.0) = 100%)
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min

        #Uses linear decay
        self.epsilon_decay = (epsilon - epsilon_min) / epsilon_stop_episode if epsilon_stop_episode > 0 else 0

        #Replaces memory to store expierences
        self.memory = deque(maxlen=mem_size)

        #Allows choice to have certain amount of memories before training starts
        #which helps it stay stable
        self.replay_start_size = replay_start_size if replay_start_size else mem_size // 2

        #NUral network
        self.n_neurons = n_neurons
        self.activations = activations
        self.loss = loss
        self.optimizer = optimizer

        #Creates the model
        self.model = self.create_model()

    def create_model(self):
        model = Sequential()

        # First hidden layer
        model.add(Dense(self.n_neurons[0], activation=self.activations[0], input_shape=(self.state_size,)))

        # Additional hidden layers
        for i in range(1, len(self.n_neurons)):
            model.add(Dense(self.n_neurons[i], activation=self.activations[i]))


        # Output layer
        model.add(Dense(1, activation=self.activations[-1]))
        model.compile(loss=self.loss, optimizer=self.optimizer)
        return model

    #Adds new memory
    def add_to_memory(self, state, next_state, reward, done):
        self.memory.append((state, next_state, reward, done))

    #Predicts Q values
    def predict_value(self, state):
        state_input = np.reshape(state, (1, self.state_size))
        return self.model.predict(state_input, verbose=0)[0]

    #Chooses best state (or random based on epsilon)
    def best_state(self, states):
        if random.random() < self.epsilon:
            return random.choice(list(states))
        return max(states, key=lambda s: self.predict_value(s))

    def train(self, batch_size=32, epochs=3):
        if len(self.memory) < self.replay_start_size:
            return

        #Gets a random amount of memories (too many will bog it down)
        minibatch = random.sample(self.memory, batch_size)

        #Gets state arrays ready for predictions
        current_states = np.array([sample[0] for sample in minibatch])
        next_states = np.array([sample[1] for sample in minibatch])


        #Predicts Q balues for future moves
        target_qs = self.model.predict(next_states, verbose=0)
        inputs, targets = [], []

        #Uses values to give target for machine to hit and trains model
        for idx, (state, _, reward, done) in enumerate(minibatch):
            q_value = reward if done else reward + self.discount * target_qs[idx][0]
            inputs.append(state)
            targets.append(q_value)
        self.model.fit(np.array(inputs), np.array(targets), batch_size=batch_size, epochs=epochs, verbose=0)

        #Lower epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon -= self.epsilon_decay



#Save as gif

In [34]:
def save_game_as_gif(agent, env, filename="best_run.gif", delay=100):

    #List to store all the GIF frams
    frames = []

    #Resets environment to starting state

    current_state = env.reset()

    #Game starts as not over
    done = False


    while not done:

        #Gets possible next states and maps features to actions

        next_states = {tuple(v): k for k, v in env.get_next_states().items()}

        #Agent selects the next state (random or best based on current knowledge)
        best_state = agent.best_state(next_states.keys())

        #Gets action connected to state
        best_action = next_states[best_state]

        #Plays action + observes result
        reward, done = env.play(best_action[0], best_action[1])

        #Renders current frame as image

        img = [env.COLORS[p] for row in env._get_complete_board() for p in row]
        img = np.array(img).reshape(env.BOARD_HEIGHT, env.BOARD_WIDTH, 3).astype(np.uint8)
        img = Image.fromarray(img, 'RGB')
        img = img.resize((env.BOARD_WIDTH * 25, env.BOARD_HEIGHT * 25), Image.NEAREST)

        #Adds frame to list
        frames.append(img)

    #Saves all frames as GIF
    frames[0].save(filename, save_all=True, append_images=frames[1:], optimize=False, duration=delay, loop=0)


DQN

In [35]:
def dqn():

    #Initalizes environment
    env = Tetris()

    #Number of episodes
    episodes = 3000

    #Model has built in limit for steps (we decided to not use it)
    max_steps = None

    #Epsilon will stop decaying at this value

    epsilon_stop_episode = 2000

    #Size of memory
    mem_size = 1000

    #Discount for future rewards (1 = more future based, 0 more current)
    discount = 0.99
    batch_size = 32
    epochs = 1

    #How often to plot episodes
    log_every = 50

    #List of scores and episodes
    scores = []
    episodes_list = []
    #Allows the first score to be max score
    best_score = -6000

    #Initalizes DQN agent
    tetrisagent = DQNAgent(
        env.get_state_size(),

        #Nurons
        n_neurons=[32, 32, 32],

        #Models we found used relu so we stuck with that
        activations=['relu', 'relu', 'relu', 'linear'],
        epsilon_stop_episode=epsilon_stop_episode,
        mem_size=mem_size,
        discount=discount
    )

    #Training loop (for episode in range of episodes)
    #tqdm is for progress
    for episode in tqdm(range(episodes)):
        current_state = env.reset()
        game_over = False
        while not game_over:

            #Gets possible sates and chooses the best one (or random)
            next_states = {tuple(v): k for k, v in env.get_next_states().items()}
            best_state = tetrisagent.best_state(next_states.keys())
            best_action = next_states[best_state]

            #Does action and gets reward
            reward, game_over = env.play(best_action[0], best_action[1])

            #Stores info in memory
            tetrisagent.add_to_memory(current_state, best_state, reward, game_over)
            current_state = best_state

        #Records score
        scores.append(env.get_game_score())
        episodes_list.append(episode)


        #Trains agent
        tetrisagent.train(batch_size=batch_size, epochs=epochs)

        # This logs score and creates plot
        if log_every and episode and episode % log_every == 0:
            avg_score = mean(scores[-log_every:])
            min_score = min(scores[-log_every:])
            max_score = max(scores[-log_every:])
            print(f"Episode {episode} | Avg: {avg_score:.2f} | Min: {min_score} | Max: {max_score}")

            plt.figure(figsize=(12, 6))
            plt.plot(episodes_list, scores, label="Score", color='blue')

            if len(scores) >= 20:
                moving_avg = np.convolve(scores, np.ones(20)/20, mode='valid')
                plt.plot(episodes_list[19:], moving_avg, label="Moving Average (20 episodes)", color='red')

            plt.xlabel("Episode")
            plt.ylabel("Score")
            plt.title("Tetris DQN Training Progress")
            plt.legend()
            plt.grid(True)
            plt.savefig(f"training_progress_{episode}.png")
            plt.close()

        #Saves best model as a GIF
        if env.get_game_score() > best_score:
            best_score = env.get_game_score()
            print(f"Saving a new best model (score={env.get_game_score()}, episode={episode})")
            save_game_as_gif(tetrisagent, Tetris(), filename=f"best_run_episode{episode}.gif", delay=150)
            print(f"Saved best run as GIF: best_run_episode{episode}.gif  (score={env.get_game_score()}")


Start Model

In [36]:
dqn()

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
  0%|          | 1/3000 [00:00<07:50,  6.38it/s]

Saving a new best model (score=21, episode=0)
Saved best run as GIF: best_run_episode0.gif  (score=21


  0%|          | 4/3000 [00:00<05:30,  9.05it/s]

Saving a new best model (score=23, episode=3)
Saved best run as GIF: best_run_episode3.gif  (score=23


  0%|          | 9/3000 [00:00<03:58, 12.56it/s]

Saving a new best model (score=25, episode=7)
Saved best run as GIF: best_run_episode7.gif  (score=25


  1%|          | 24/3000 [00:03<06:52,  7.22it/s]


KeyboardInterrupt: 