In [1]:
from random import random, randint, sample

import numpy as np
import torch
import torch.nn as nn

import cv2
from tetris import Tetris
from collections import deque

## Deep Q Learning Network

In [None]:
class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()

        self.conv1 = nn.Sequential(nn.Linear(4, 64), nn.ReLU(inplace=True))
        self.conv2 = nn.Sequential(nn.Linear(64, 64), nn.ReLU(inplace=True))
        self.conv3 = nn.Sequential(nn.Linear(64, 1))

        self._create_weights()

    def _create_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)

        return x

In [2]:
# DQN for modified environment with increased state space

class DQN(nn.Module):
    def __init__(self, input_size=8):
        super(DQN, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(128)
        )

        self.conv2 = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(128)
        )

        self.conv3 = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(inplace=True)
        )

        # Output layer
        self.output = nn.Linear(64, 1)

        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.output(x)
        return x


## Global Constants

In [8]:

WIDTH = 10  # Width of board
HEIGHT = 20  # Height of board
BLOCK_SIZE = 30  # Block size when rendering
BATCH_SIZE = 512  # High batch size
LEARNING_RATE = 1e-3
GAMMA = 0.99
INITIAL_EPSILON = 1.0
FINAL_EPSILON = 1e-3
NUM_DECAY_EPOCHS = 1350
NUM_EPOCHS = 3000
SAVE_INTERVAL = 250
REPLAY_MEMORY_SIZE = 22000

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE

device(type='cuda')

## Training Loop Using Epsilon Greedy and Experience Replay

In [None]:
def train():
    torch.manual_seed(42)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(42)

    env = Tetris(width=WIDTH, height=HEIGHT, block_size=BLOCK_SIZE)
    model = DQN().to(DEVICE)
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.MSELoss()

    state = env.reset().to(DEVICE)
    replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
    epoch = 0

    while epoch < NUM_EPOCHS:
        next_steps = env.get_next_states()
        # Epsilon Greedy
        epsilon = FINAL_EPSILON + (max(NUM_DECAY_EPOCHS - epoch, 0) * 
                                   (INITIAL_EPSILON - FINAL_EPSILON) / NUM_DECAY_EPOCHS)
        next_actions, next_states = zip(*next_steps.items())
        next_states = torch.stack(next_states).to(DEVICE)

        #print(next_states.shape)

        model.eval()
        with torch.no_grad():
            predictions = model(next_states)[:, 0]
        model.train()

        if random() <= epsilon:
            index = randint(0, len(next_steps) - 1)
        else:
            index = torch.argmax(predictions).item()

        next_state = next_states[index, :]
        action = next_actions[index]

        reward, done = env.step(action, render=True)
        next_state = next_state.to(DEVICE)
        replay_memory.append([state, reward, next_state, done])

        if done:
            final_score = env.score
            final_tetrominoes = env.tetrominoes
            final_cleared_lines = env.cleared_lines
            state = env.reset().to(DEVICE)
        else:
            state = next_state
            continue

        if len(replay_memory) < REPLAY_MEMORY_SIZE / 10:
            continue

        epoch += 1
        batch = sample(replay_memory, min(len(replay_memory), BATCH_SIZE))
        state_batch, reward_batch, next_state_batch, done_batch = zip(*batch)
        state_batch = torch.stack(tuple(state for state in state_batch)).to(DEVICE)
        reward_batch = torch.from_numpy(np.array(reward_batch, dtype=np.float32)[:, None]).to(DEVICE)
        next_state_batch = torch.stack(tuple(state for state in next_state_batch)).to(DEVICE)

        q_values = model(state_batch)
        model.eval()
        with torch.no_grad():
            next_prediction_batch = model(next_state_batch)
        model.train()
 
        # Compute the target Q-values for each transition
        y_values = [
            reward if done else reward + GAMMA * prediction
            for reward, done, prediction in zip(reward_batch, done_batch, next_prediction_batch)
        ]
        y_tensor = torch.tensor(y_values, dtype=torch.float32, device=DEVICE)
        y_batch = y_tensor[:, None]

        optimizer.zero_grad()
        loss = criterion(q_values, y_batch)
        loss.backward()
        optimizer.step()

        print("Epoch: {}/{}, Action: {}, Score: {}, Tetrominoes {}, Cleared lines: {}".format(
            epoch,
            NUM_EPOCHS,
            action,
            final_score,
            final_tetrominoes,
            final_cleared_lines))

        if epoch > 0 and epoch % SAVE_INTERVAL == 0:
            torch.save(model.state_dict(), "policy_net.pth")

    torch.save(model.state_dict(), "policy_net.pth")
    return model


In [9]:
agent_tetris = train()

Epoch: 1/3000, Action: (5, 1), Score: 13, Tetrominoes 15, Cleared lines: 0
Epoch: 2/3000, Action: (7, 3), Score: 15, Tetrominoes 17, Cleared lines: 0
Epoch: 3/3000, Action: (2, 2), Score: 21, Tetrominoes 23, Cleared lines: 0
Epoch: 4/3000, Action: (4, 1), Score: 16, Tetrominoes 18, Cleared lines: 0
Epoch: 5/3000, Action: (5, 1), Score: 16, Tetrominoes 18, Cleared lines: 0
Epoch: 6/3000, Action: (6, 1), Score: 10, Tetrominoes 12, Cleared lines: 0
Epoch: 7/3000, Action: (4, 1), Score: 19, Tetrominoes 21, Cleared lines: 0
Epoch: 8/3000, Action: (1, 0), Score: 10, Tetrominoes 12, Cleared lines: 0
Epoch: 9/3000, Action: (2, 1), Score: 21, Tetrominoes 23, Cleared lines: 0
Epoch: 10/3000, Action: (2, 0), Score: 19, Tetrominoes 21, Cleared lines: 0
Epoch: 11/3000, Action: (8, 3), Score: 20, Tetrominoes 22, Cleared lines: 0
Epoch: 12/3000, Action: (3, 0), Score: 17, Tetrominoes 19, Cleared lines: 0
Epoch: 13/3000, Action: (4, 1), Score: 20, Tetrominoes 22, Cleared lines: 0
Epoch: 14/3000, Actio

KeyboardInterrupt: 

## Record a Single Game with Trained Agent

In [None]:
# Constants
WIDTH = 10  # Width of the board
HEIGHT = 20  # Height of the board
BLOCK_SIZE = 30  # Block size when rendering
FPS = 100  # Frames per second for video output

OUTPUT_VIDEO = "demo.mp4"

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

agent_tetris = DQN().to(DEVICE)
agent_tetris.load_state_dict(torch.load("policy_net.pth"))

  agent_tetris.load_state_dict(torch.load("expanded_policy_net.pth"))


<All keys matched successfully>

In [11]:
def record_game(model):
    """
    Test the Deep Q Network by playing Tetris and rendering the output as a video.
    """
    # Set random seeds for reproducibility
    torch.manual_seed(123)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(123)

    model.eval()  # Set the model to evaluation mode

    env = Tetris(width=WIDTH, height=HEIGHT, block_size=BLOCK_SIZE)
    env.reset()
    model.to(DEVICE)

    video_size = (int(1.5 * WIDTH * BLOCK_SIZE), HEIGHT * BLOCK_SIZE)
    video_writer = cv2.VideoWriter(OUTPUT_VIDEO, cv2.VideoWriter_fourcc(*"mp4v"), FPS, video_size)

    while True:
        next_steps = env.get_next_states()
        next_actions, next_states = zip(*next_steps.items())
        next_states = torch.stack(next_states).to(DEVICE)

        with torch.no_grad():
            predictions = model(next_states)[:, 0]
        best_action_index = torch.argmax(predictions).item()
        action = next_actions[best_action_index]

        _, done = env.step(action, render=True, video=video_writer)

        if done:
            video_writer.release()
            print("Game over. Video saved to:", OUTPUT_VIDEO)
            break
    cv2.destroyAllWindows()


In [12]:
record_game(agent_tetris)

Game over. Video saved to: expanded_demo.mp4


## Get Agent Stats for 20 Games

In [None]:
FPS = 300
def evaluate_model(model, num_games=20):
    model.eval()
    env = Tetris(width=WIDTH, height=HEIGHT, block_size=BLOCK_SIZE)

    total_score = 0
    total_tetrominoes = 0
    total_lines_cleared = 0

    for _ in range(num_games):
        _ = env.reset().to(DEVICE)
        game_score = 0
        game_tetrominoes = 0
        game_lines_cleared = 0

        while True:
            next_steps = env.get_next_states()
            next_actions, next_states = zip(*next_steps.items())
            next_states = torch.stack(next_states).to(DEVICE)

            with torch.no_grad():
                predictions = model(next_states)[:, 0]
            best_action_index = torch.argmax(predictions).item()
            action = next_actions[best_action_index]

            _, done = env.step(action)

            game_score = env.score
            game_tetrominoes = env.tetrominoes
            game_lines_cleared = env.cleared_lines

            if done:
                break

        # Accumulate totals
        total_score += game_score
        total_tetrominoes += game_tetrominoes
        total_lines_cleared += game_lines_cleared

        #print(f"Game {game + 1}/{num_games} - Score: {game_score}, Tetrominoes: {game_tetrominoes}, Lines Cleared: {game_lines_cleared}")

    # Calculate averages
    avg_score = total_score / num_games
    avg_tetrominoes = total_tetrominoes / num_games
    avg_lines_cleared = total_lines_cleared / num_games

    print(f"\nEvaluation Results:")
    print(f"Average Score: {avg_score}")
    print(f"Average Tetrominoes: {avg_tetrominoes}")
    print(f"Average Lines Cleared: {avg_lines_cleared}")

    return avg_score, avg_tetrominoes, avg_lines_cleared


In [15]:
avg_score, avg_tetrominoes, avg_lines_cleared = evaluate_model(agent_tetris)


Evaluation Results:
Average Score: 446.3
Average Tetrominoes: 118.3
Average Lines Cleared: 33.0
