*Cell with all imports for notebook*

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import torch.nn.functional as F
import socket
import random
import copy
import math

# Training a neural network to play the 2048 game with Pytorch

*by Chad Schwenke and Logan Cadman, December, 2023* 

## Introduction

Utilizing machine learning our goal was to train a neural network to play the 2048 game. In 2048 you start out with a 4x4 game board with the number 2 in a random position, the player must move all tiles up, down, left, or right. Each move will add another 2 or 4 tile in a random empty position, and tiles can only merge and add if they are the same number. It's a strategic movement game in which you essentially combine tiles to create a higher tile on the game board which increases your score.

Training a neural network to play 2048 seemed like an interesting endeavor for multiple reasons. First, our inspiration for this project comes from assignment 5 in class where we trained a reinforcement algorithm to play the tic-tac-toe game. That assignment showed us that training a neural network to play a simple game could be quite interesting. This is because it would allow us to play with different machine learning concepts by providing a practical and fun approach to apple what we've learned in class. 

2048 is a relatively simple game at first glance, but it is more complex than the tic-tac-toe game. 2048 has clear objectives and easily measurable outcomes such as the score, highest tile, and number of tiles merged. It seems to provide a good platform for experimenting with a multiude of different network structures, learning strategies, optimization methods, and different reward systems. The game requires a strategy and some planning to achieve high scores. Neural networks often use something called a 'discount factore' or in our case we called it 'gamma' this essentially determiens if the agent is going to look for an immediate reward, a future reward, or a balance between the two. This is an interesting system in which the agent must be strategicly planning, and a decreaase in the 'gamma' would show wether or not short term thinking is more or less benefical than an increase in gamma which shows if long term thinking is better. 

While looking at the tic-tac-toe reinforcement code we realized that it was storing a set of all possible game boards, something that would not be possible in 2048 because there are 18 possible states for each tile, and for a 4x4 board there are 16 total tiles on the board at a time leaving us to believe that there are 18^16 possible board combinations which in no way could be stored in a set. We realized it would however be possible to store a partial set of boards in a set, but did not end up doing this. Rather when the agent plays 2048, they will create there own data and then learn and act upon that data to improve its actions.

After doing a bit of work on this project we determined that we would not be adapting the a5 code for the 2048 game. This was due to a combination of factors, including the fact that the a5 reinforcement learning agent was using a set of the possible boards. Overall, we wouldn't have been learning as much if we tried to adapt that code, by creating our own code using PyTorch it allowed us to learn and better understand machine learning. We also didn't want to re-invent the wheel by doing a Tensorflow implementation because it would have been nearly identical, and our goal was to learn RL and not compare frameworks.

With so many possible combinations it appears that the maximum tile for 2048 would be either 65,536 OR 131,072. Being that these are so high and neither of us have ever achieved a tile even close to that, it peaked our interest even more. Would it be possible for a neural network agent to achieve a tile with that score? This is an interesting idea because that tile would be extremely hard for a human to achieve.

For our neural network we ended up using a few different network designs. These include a simple fully connected network 'QNetworkSimple', another with dropout layers 'QNetworkDropout' but similiar to the simple network, and lastly 'QNetworkConv' a convolutional neural network. We tried a few other designs too, but these are the ones that we did the majority of our tests with.

In [None]:
# Set device to GPU if available
device = "cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu")

In [None]:
class QNetworkSimple(nn.Module):
    def __init__(self):
        super(QNetworkSimple, self).__init__()
        self.fc1 = nn.Linear(16, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 4)

    def forward(self, x):
        x = x.to(device)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

This simple network is fully connected and has three layers. It is designed to take in a fully flattened game board with 16 inputs. This is a game board, that has been flattened and from the original 4x4 board. Each layer is represented by fc1, fc2, and fc3. The first layer is taking in 16 inputs and mapping that to 128 outputs, the second layer is taking in 128 and mapping that to 64, and the last layer is taking in 64 and mapping that to 4. The forward function then has 'x = x.to(device)' to ensure we are using a gpu if available. We then activate two of the layers with the relu function so that it is non-linear, and leave the last layer as output.

In [None]:
class QNetworkDropout(nn.Module):
    def __init__(self):
        super(QNetworkDropout, self).__init__()
        self.fc1 = nn.Linear(16, 64)
        self.fc2 = nn.Linear(64, 128)
        self.dropout = nn.Dropout(p=0.5)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 4)

    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

This network design is very similiar to our last network but modified to use dropout layers which help with regularization. There are three layers declared, and then one output layer. The dropout function declared after the first two layers is introduced to stop reduce the chance of a value being dropped out. In the forward function we again use relu to add some non-linear and each dropout layer after is used to again add regulurization to the previous layer, this is then repeated.

In [None]:
class QNetworkConv(nn.Module):
    def __init__(self):
        super(QNetworkConv, self).__init__()
        self.conv_block = ConvBlock(input_dim=1, output_dim=32) # Adjust the input dim accordingly
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(4*4*32, 128)  # Adjust the size accordingly
        self.fc2 = nn.Linear(128, 4)  # Final layer for 4 decisions

    def forward(self, x):
        x = x.to(device)
        x = self.conv_block(x)
        x = self.flatten(x)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
class ConvBlock(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(ConvBlock, self).__init__()
        self.input_dim = input_dim
        d = output_dim // 4
        self.conv1 = nn.Conv2d(input_dim, d, kernel_size=1, padding='same')
        self.conv2 = nn.Conv2d(input_dim, d, kernel_size=2, padding='same')
        self.conv3 = nn.Conv2d(input_dim, d, kernel_size=3, padding='same')
        self.conv4 = nn.Conv2d(input_dim, d, kernel_size=4, padding='same')
        self.relu = nn.ReLU()

    def forward(self, x):
        # Reshape input to [batch_size, channels, height, width]
        if len(x.shape) == 1:  # Unbatched input
            x = x.view(1, self.input_dim, 4, 4)  # Assuming input is a flattened 4x4 board
        elif len(x.shape) == 2:  # Batched input
            x = x.view(-1, self.input_dim, 4, 4)  # -1 for batch size

        x1 = self.relu(self.conv1(x))
        x2 = self.relu(self.conv2(x))
        x3 = self.relu(self.conv3(x))
        x4 = self.relu(self.conv4(x))
        return torch.cat((x1, x2, x3, x4), dim=1)

This section of code contains two things, first we have a 'QNetworkConv' which is a convulutional neural network, and it required the addition of the second function 'ConvBlock'. This network design has a convolutional block, a flattened layer, and two connected layers. The forward method then passes these layers through the convolutional block, and doing some similiar actions that we have previously discussed for the other designs. The convolutional block used 4 layers as seen by conv1, conv2 and so on. The forward function in the 'ConvBlock' reshapes its input and then applies it to the relu activiation function.

## Methods

In order to create this project there were a few things we needed to create. First, we needed a 2048 game board, this was defined as 'class Board', it contains everything related to the game. We then created a class 'replayBuffer' which contains the data that is being created by the agent, that can then be revisited. After that we have our training loop, an evaluation model, and lastly a random evaluation model to test as a baseline. The code and explanations for these are below.

In [None]:
class Board:
    def __init__(self):
        # Declare variables for score, last added tile position, merges made in last move, and game over flag
        self.score = 0
        self.last_added_tile = None
        self.merges_in_last_move = 0
        self.game_over = False

        # Initialize board with all zeros (4x4 grid) and then add two tiles
        self.board = [[0] * 4 for _ in range(4)]
        self._add_new_tile()
        self._add_new_tile()

    def print_board(self):
        # Iterates through each row of the board and prints it
        for row in self.board:
            print(' '.join(map(str, row)))

    def print_score(self):
        # Outputs the current score
        print(f"Score: {self.score}")

    def print_highest_tile(self):
        # Outputs the current highest tile
        print(f"Highest Tile: {self.highest_tile()}")

    def move_tiles(self, direction):
        # Check if game is already over
        if self.game_over:
            return False

        # Reset merge count to zero
        self.merges_in_last_move = 0

        # Make a copy of the original board
        original_board = [row[:] for row in self.board]

        # Transpose board for up and down moves
        if direction in ('u', 'd'):
            self._transpose_board()

        # Run logic for all rows or columns in move
        for i in range(4):
            # Up and left are treated the same after transpose
            if direction in ('u', 'l'):
                shifted_row = self._shift(self.board[i])
                self.board[i] = self._merge(shifted_row)
            # Down and right must be reversed first since functions treat everything as left
            elif direction in ('d', 'r'):
                reversed_row = list(reversed(self.board[i]))
                shifted_row = self._shift(reversed_row)
                merged_row = self._merge(shifted_row)
                # Undo reverse
                self.board[i] = list(reversed(merged_row))

        # Run transpose again to reset
        if direction in ('u', 'd'):
            self._transpose_board()

        # Check if board changed from original
        if self.board != original_board:
            # Add a new tile and then check if game is over
            self._add_new_tile()
            if not self._moves_available():
                self.game_over = True
            return True
        # If board did not change from original, the move was invalid but game is not yet over
        return False

    def possible_moves(self):
        # Create lists to store moves and try all moves
        possible_moves = []
        directions = ['u', 'd', 'l', 'r']

        # Try all possible directions
        for direction in directions:
            if self._simulate_move(direction):
                possible_moves.append(direction)

        # Return list of directions
        return possible_moves

    def get_flattened_board(self):
        flattened_board = []
        for row in self.board:
            for value in row:
                # Traverse board and append each value
                flattened_board.append(value)
        return flattened_board

    def get_normalized_flattened_board(self):
        normalized_flattened_board = []
        for row in self.board:
            for value in row:
                # Traverse board and append each value normalized to base 2 (except zeros)
                normalized_value = math.log(value, 2) if value != 0 else 0
                normalized_flattened_board.append(normalized_value)
        return normalized_flattened_board

    def _transpose_board(self):
        # Converts rows to columns and columns to rows
        transposed = []
        for col_index in range(4):
            new_row = []
            for row in self.board:
                new_row.append(row[col_index])
            transposed.append(new_row)
        self.board = transposed

    def _shift(self, row):
        # Shifts non-zero elements to the left in a row
        shifted_row = []
        for value in row:
            if value != 0:
                shifted_row.append(value)
        # Append zeros to the end of the row to maintain its size
        while len(shifted_row) < 4:
            shifted_row.append(0)
        return shifted_row

    def _merge(self, row):
        # Merge adjacent tiles with the same value
        for i in range(3):
            if row[i] == row[i + 1] and row[i] != 0:
                row[i] *= 2
                row[i + 1] = 0
                self.score += row[i]
                # Update the number of merges
                self.merges_in_last_move += 1
        # Shift again to ensure tiles are properly aligned
        return self._shift(row)

    def _add_new_tile(self):
        # Check for possible empty positions
        empty_positions = []
        for i in range(4):
            for j in range(4):
                # If cell is zero it is empty
                if self.board[i][j] == 0:
                    empty_positions.append((i, j))
        if empty_positions:
            i, j = random.choice(empty_positions)
            # Add a 2 or 4 to a random empty position (10% chance to be 4, 90% chance to be 2)
            self.board[i][j] = 4 if random.random() < 0.1 else 2
            self.last_added_tile = (i, j)

    def _moves_available(self):
        # Check if there are any moves available
        for i in range(4):
            for j in range(4):
                # Check for empty spot
                if self.board[i][j] == 0:
                    return True
                # Check for possible merges in the row
                if i < 3 and self.board[i][j] == self.board[i + 1][j]:
                    return True
                # Check for possible merges in the column
                if j < 3 and self.board[i][j] == self.board[i][j + 1]:
                    return True
        return False

    def _simulate_move(self, direction):
        # Make a copy of the whole object before simulating move
        board_copy = copy.deepcopy(self)

        # Simulate the move on the copy
        board_copy.move_tiles(direction)

        # Return weather or not the board changed
        return board_copy.board != self.board
    
    def highest_tile(self):
        # Returns the highest tile on the board
        highest_tile = 0
        for row in self.board:
            for value in row:
                if value > highest_tile:
                    highest_tile = value
        return highest_tile

TODO THINGS ABOUT BOARD

In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0

    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

# Declare all variables and objects for training
q_network = QNetworkSimple().to(device)
optimizer = optim.Adam(q_network.parameters(), lr=5e-5)
criterion = nn.MSELoss()
replay_buffer = ReplayBuffer(capacity=1000)
num_episodes = 100
batch_size = 100
epsilon = 0.9
epsilon_end = 0.01
epsilon_decay = .999
action_indices = {'u': 0, 'd': 1, 'l': 2, 'r': 3}
gamma = 0.99

if torch.cuda.is_available():
    print(f'{socket.gethostname()} has an available cuda GPU.')
    
if torch.backends.mps.is_available():
    print(f'{socket.gethostname()} has an available mps GPU.')

# Loop through batches
for episode in range(num_episodes):
    # Start a new game board and get the initial state
    game = Board()
    state = game.get_normalized_flattened_board()

    # Loop through steps until the game is over
    while not game.game_over:
        # Get the possible moves
        possible_moves = game.possible_moves()

        # Choose an action using epsilon-greedy
        if np.random.uniform() < epsilon:
            with torch.no_grad():
                # Get action values from the network
                state_tensor = torch.FloatTensor(state).unsqueeze(0)
                action_values = q_network(state_tensor)

                # Filter action values for only possible actions
                masked_action_values = torch.full(action_values.shape, float('-inf'))
                for action in possible_moves:
                    index = action_indices[action]
                    masked_action_values[0][index] = action_values[0][index]
                action = torch.argmax(masked_action_values).item()

                # Convert action index to action key
                for action_key, index in action_indices.items():
                    if index == action:
                        action = action_key
                        break
        # Otherwise choose a random action
        else:
            action = random.choice(possible_moves)

        # Move the tiles using the action and store the reward, state, and done
        game.move_tiles(action)
        reward = game.merges_in_last_move
        next_state = game.get_normalized_flattened_board()
        done = game.game_over
        replay_buffer.push(state, action, reward, next_state, done)

        # Sample and update network if buffer is large enough
        if len(replay_buffer.buffer) > batch_size * 10:
            batch = replay_buffer.sample(batch_size)
            # Split batch into separate components, convert actions to indices
            states, actions, rewards, next_states, dones = zip(*batch)
            actions_modified = [action_indices[action] for action in actions]

            # Convert to tensors
            states = torch.FloatTensor(states).to(device)
            actions = torch.LongTensor(actions_modified).to(device)
            rewards = torch.FloatTensor(rewards).to(device)
            next_states = torch.FloatTensor(next_states).to(device)
            dones = torch.BoolTensor(dones).to(device)

            # Compute current Q values
            current_q_values = q_network(states).gather(1, actions.unsqueeze(1))

            # Compute next Q values
            next_q_values = q_network(next_states).max(1)[0]

            # Zero out Q values that will lead to done state
            next_q_values[dones] = 0.0

            # Compute target Q values
            target_q_values = rewards + (gamma * next_q_values)

            # Compute loss
            loss = criterion(current_q_values, target_q_values.unsqueeze(1))

            # Optimize the network
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Update the state
        state = next_state

    # Update epsilon
    epsilon *= epsilon_decay

    # Display progress
    if episode % 100 == 0:
            print(f"Ran {episode} episodes...")

TODO THINGS ABOUT REPLAYBUFFER

In [None]:
def evaluate_model(num_games=100):
    top_scores = []
    total_score = 0
    for game_num in range(num_games):
        game = Board()
        state = game.get_normalized_flattened_board()
        while not game.game_over:
            with torch.no_grad():
                state_tensor = torch.FloatTensor(state).unsqueeze(0)
                action_values = q_network(state_tensor)              
                possible_moves = game.possible_moves()
                masked_action_values = torch.full(action_values.shape, float('-inf'))
                for move in possible_moves:
                    index = action_indices[move]
                    masked_action_values[0][index] = action_values[0][index]                
                action = torch.argmax(masked_action_values).item()
                chosen_action = list(action_indices.keys())[list(action_indices.values()).index(action)]
            game.move_tiles(chosen_action)
            state = game.get_normalized_flattened_board()
        total_score += game.score
        top_scores.append(game.highest_tile())
        if game_num % 100 == 0:
            print(f"Played {game_num} games...")
    avg_score = total_score / num_games
    print(f"Average Score: {avg_score}")
    return top_scores

## Results

In the proposal, summarize what you expect your results to be.  

In the final report, show all results.  Intermediate results might be shown in above Methods section.  Plots, tables, whatever is needed to tell your story.

## Conclusions

In your proposal, describe what you expect to learn and what you expect will be most difficult.

In your project report, describe what you learned, and what was most difficult.  Summarize any surprises you encontered.

### References

* [Goodfellow, et al., 2016] Ian Goodfellow and Yoshua Bengio and Aaron Courville, [Deep Learning](http://www.deeplearningbook.org), MIT Press. 2014.
* [Baluja, 2021] Michael Baluja, [Reinforcement Learning for 2048](https://github.com/michaelbaluja/rl-2048), 2021
* [Virdee, 2018] Navjinder Virdee [Trained A Neural Network To Play 2048 using Deep-Reinforcement Learning](https://github.com/navjindervirdee/2048-deep-reinforcement-learning), 2018
* [Pan, 2019] Tianyi Pan [Applied Reinforcement Learning with 2048](https://www.linkedin.com/pulse/part-1-applied-reinforcement-learning-2048-tianyi-pan), 2019
* [Goenawan, 2020] Nathaniel Goenawan and Simon Tao and Katherine Wu [What’s in a Game: Solving 2048 with Reinforcement Learning](https://web.stanford.edu/class/aa228/reports/2020/final41.pdf), Stanford 2020
* [Paszke, 2023] Adam Paszke and Mark Towers [Reinforcement Learning (DQN) Tutorial](https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html), PyTorch 2023

In [12]:
import io
import nbformat
import glob
nbfile = glob.glob('TermProjectChadLogan.ipynb')
if len(nbfile) > 1:
    print('More than one ipynb file. Using the first one.  nbfile=', nbfile)
with io.open(nbfile[0], 'r', encoding='utf-8') as f:
    nb = nbformat.read(f, nbformat.NO_CONVERT)
word_count = 0
for cell in nb.cells:
    if cell.cell_type == "markdown":
        word_count += len(cell['source'].replace('#', '').lstrip().split(' '))
print('Word count for file', nbfile[0], 'is', word_count)

Word count for file TermProjectChadLogan.ipynb is 1299
