In [1]:
import numpy as np
# import cupy as np
import random
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict
from dataclasses import dataclass
from enum import Enum
import time

import torch

In [2]:
# print(torch.__version__)
# print(torch.backends.mps.is_available())
# print(torch.tensor([1,2,3], device='mps'))  # Should succeed on Apple Silicon

In [3]:
class Direction(Enum):
    LEFT = 0
    UP = 1
    RIGHT = 2
    DOWN = 3

In [4]:
import numpy as np

class Game2048Env:
    def __init__(self):
        self.grid_size = 4
        self.reset()
        
    def reset(self):
        self.board = np.zeros((self.grid_size, self.grid_size), dtype=int)
        self.spawn_tile()
        self.spawn_tile()
        self.score = 0
        return self.board.copy()
    
    def spawn_tile(self):
        empty = list(zip(*np.where(self.board == 0)))
        if empty:
            x, y = empty[np.random.randint(len(empty))]
            self.board[x, y] = 2 if np.random.random() < 0.9 else 4
        
    def step(self, action: Direction):
        moved, reward = self.move(action.value)
        if moved:
            self.spawn_tile()
        else:
            # Stop if invalid move
            return self.board.copy(), reward, True, {}
        done = not self.can_move()
        self.score += reward
        return self.board.copy(), reward, done, {}
    
    def move(self, direction):
        board = np.copy(self.board)
        reward = 0
        moved = False

        # Rotate board so all moves are left-moves
        for _ in range(direction):
            board = np.rot90(board)
            
        for i in range(self.grid_size):
            tiles = board[i][board[i] != 0]  # Extract non-zero
            merged = []
            j = 0
            while j < len(tiles):
                if j + 1 < len(tiles) and tiles[j] == tiles[j + 1]:
                    merged_val = tiles[j] * 2
                    reward += 10
                    merged.append(merged_val)
                    j += 2  # Skip next
                    moved = True
                else:
                    merged.append(tiles[j])
                    reward += 1
                    j += 1
            # Pad with zeros to the right
            merged += [0] * (self.grid_size - len(merged))
            # Detect if move or merge happened
            if not np.array_equal(board[i], merged):
                moved = True
            board[i] = merged

        # Restore original orientation
        for _ in range((4 - direction) % 4):
            board = np.rot90(board)
            
        if moved:
            self.board = board

        return moved, reward

    
    def can_move(self):
        for direction in range(4):
            temp_board = self.board.copy()
            moved, _ = self.move(direction)
            self.board = temp_board  # Restore original
            if moved:
                return True
        return False



In [5]:
game = Game2048Env()
state = game.reset()
done = False

def print_board(board):
    for x in board:
        print("\t".join(f"{v:4}" for v in x))
    print("-" * 20)

print_board(state)

done = False

while not done:  # Play some random moves

    # action = Direction(np.random.randint(4))  # Random action for demonstration
    action = Direction.LEFT  # Fixed action for demonstration
    state, reward, done, _ = game.step(action)

    print(f"Action: {action.name} | Score: {game.score}")
    print(f"Reward: {reward} | Done: {done}")
    
    print_board(state)
    
    

   0	   0	   0	   2
   0	   0	   0	   0
   0	   0	   0	   0
   0	   0	   4	   0
--------------------
Action: LEFT | Score: 2
Reward: 2 | Done: False
   2	   0	   4	   0
   0	   0	   0	   0
   0	   0	   0	   0
   4	   0	   0	   0
--------------------
Action: LEFT | Score: 5
Reward: 3 | Done: False
   2	   4	   0	   0
   0	   0	   0	   2
   0	   0	   0	   0
   4	   0	   0	   0
--------------------
Action: LEFT | Score: 9
Reward: 4 | Done: False
   2	   4	   0	   0
   2	   0	   0	   0
   2	   0	   0	   0
   4	   0	   0	   0
--------------------
Action: LEFT | Score: 9
Reward: 5 | Done: True
   2	   4	   0	   0
   2	   0	   0	   0
   2	   0	   0	   0
   4	   0	   0	   0
--------------------


In [6]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Determine the best available device
def get_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    elif torch.backends.mps.is_available():
        return torch.device('mps')
    else:
        return torch.device('cpu')

DEVICE = get_device()
print(f"Using device: {DEVICE}")

class SimpleNeuralNetwork(nn.Module):
    """Simple feedforward neural network using PyTorch"""

    def __init__(self, input_size: int = 16, hidden_layers: List[int] = [256], output_size: int = 4, empty: bool = False):
        super().__init__()
        
        if empty:
            return
        
        # Build layers using PyTorch modules
        layers = []
        prev_size = input_size
        
        # Add hidden layers
        for hidden_size in hidden_layers:
            layers.append(nn.Linear(prev_size, hidden_size))
            # layers.append(nn.Tanh())
            layers.append(nn.ReLU())
            prev_size = hidden_size
            
        # Add output layer (no activation)
        layers.append(nn.Linear(prev_size, output_size))
        
        self.network = nn.Sequential(*layers)
        
        # Initialize weights using He initialization
        self._initialize_weights()
        
        # Move to device
        self.to(DEVICE)
    
    def _initialize_weights(self):
        """Initialize weights using He initialization"""
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.kaiming_normal_(module.weight, nonlinearity='tanh')
                nn.init.zeros_(module.bias)
    
    def forward(self, x):
        """Forward pass through the network"""
        # Convert numpy array to tensor if needed and move to device
        if isinstance(x, np.ndarray):
            x = torch.from_numpy(x).float().to(DEVICE)
        elif isinstance(x, torch.Tensor):
            x = x.to(DEVICE)
        
        return self.network(x)
    
    def mutate(self, mutation_rate: float = 0.1, mutation_strength: float = 0.5):
        """Mutate the network's weights and biases"""
        with torch.no_grad():
            for param in self.parameters():
                if torch.rand(1).item() < mutation_rate:
                    mutation = torch.randn_like(param) * mutation_strength
                    param.add_(mutation)

Using device: mps


In [7]:
@dataclass
class GameResult:
    score: int
    max_tile: int
    moves: int

class Player:
    def __init__(self, network: SimpleNeuralNetwork):
        self.network = network

    def play(self, env: Game2048Env, max_steps: int = 100) -> GameResult:
        state = env.reset()
        total_reward = 0
        done = False
        steps = 0

        while not done and steps < max_steps:
            action = self.next_move(state)

            state, reward, done, _ = env.step(action)
            total_reward += reward
            steps += 1

        return GameResult(score=total_reward, max_tile=np.max(state), moves=steps)
    
    def next_move(self, state: np.ndarray) -> Direction:
        self.network.eval()  # Set to evaluation mode
        with torch.no_grad():
            flat_state = state.flatten() / 2048.0  # Normalize input
            q_values = self.network.forward(flat_state)
            # Move back to CPU for numpy conversion
            q_values_cpu = q_values.cpu()
            action = Direction(q_values_cpu.numpy().argmax())
            return action

In [8]:
class EvolutionaryOptimizer:
    def __init__(
            self, 
            population_size: int = 50, 
            elite_size: int = 10,
            mutation_rate: float = 0.1, 
            mutation_strength: float = 0.5,
            hidden_layers: List[int] = [32]
        ):
        self.population_size = population_size
        self.elite_size = elite_size
        self.mutation_rate = mutation_rate
        self.mutation_strength = mutation_strength
        self.hidden_layers = hidden_layers
        
        # Create initial population
        self.population = []
        for _ in range(population_size):
            network = SimpleNeuralNetwork(hidden_layers=hidden_layers)
            # Move to MPS if available
            if torch.backends.mps.is_available():
                network = network.to('mps')
            self.population.append(network)

    def evaluate(self, env: Game2048Env, games_per_player: int = 5, max_steps: int = 100) -> List[Tuple[SimpleNeuralNetwork, float, int]]:
        results = []
        for network in self.population:
            player = Player(network)
            total_score = 0
            best_score = 0
            for _ in range(games_per_player):
                game_result = player.play(env, max_steps=max_steps)
                total_score += game_result.score
                if game_result.score > best_score:
                    best_score = game_result.score
            avg_score = total_score / games_per_player
            results.append((network, avg_score, best_score))
        return results

    def select_and_breed(self, evaluated: List[Tuple[SimpleNeuralNetwork, float]]) -> None:
        # Sort by score descending
        evaluated.sort(key=lambda x: x[1], reverse=True)
        elite = evaluated[:self.elite_size] 

        new_population = []
        # Keep elite networks
        for net, _, _ in elite:
            new_population.append(net)
        
        # Create offspring by mutating elite networks
        while len(new_population) < self.population_size:
            parent = random.choice(elite)[0]
            
            # Create a child by copying the parent's state
            child = SimpleNeuralNetwork(hidden_layers=self.hidden_layers)
            child.load_state_dict(parent.state_dict())
            
            # Move to same device as parent
            child = child.to(next(parent.parameters()).device)
            
            # Mutate the child
            child.mutate(self.mutation_rate, self.mutation_strength)
            new_population.append(child)

        self.population = new_population[:self.population_size]

    def run_generation(self, env: Game2048Env, games_per_player: int = 5, max_steps: int = 1000):
        evaluated = self.evaluate(env, games_per_player, max_steps=max_steps)
        avg_score = sum(score for _, score, _ in evaluated) / len(evaluated)
        best_score = max(score for _, _, score in evaluated)
        
        self.select_and_breed(evaluated)
        
        return self.population, avg_score, best_score

In [9]:
import pickle

def save_network(network: SimpleNeuralNetwork, filename: str):
    torch.save(network.state_dict(), filename)

def load_network(filename: str, hidden_layers: List[int]) -> SimpleNeuralNetwork:
    network = SimpleNeuralNetwork(hidden_layers=hidden_layers)
    network.load_state_dict(torch.load(filename, map_location=DEVICE))
    network.to(DEVICE)
    return network

def save_population(population: List[SimpleNeuralNetwork], filename: str):
    with open(filename, 'wb') as f:
        pickle.dump(population, f)   

def load_population(filename: str) -> List[SimpleNeuralNetwork]:
    with open(filename, 'rb') as f:
        population = pickle.load(f)
    return population

In [None]:
from datetime import timedelta 

def main():
    env = Game2048Env()

    generations = 5000
    games_per_player = 10
    max_steps = 10000
    hidden_layers = [2048, 2048]

    optimizer = EvolutionaryOptimizer(
        population_size=100, 
        elite_size=50, 
        mutation_rate=0.2, 
        mutation_strength=0.5, 
        hidden_layers=hidden_layers
    )

    avg_scores = []

    loop_start_time = time.time()

    # Create directory for saving networks
    import os
    folder = f"networks/{'_'.join(str(x) for x in hidden_layers)}"
    os.makedirs(folder, exist_ok=True)

    best_score = 0

    for gen in range(generations):
        print(f"=== Generation {gen+1}/{generations} ===")
        
        (population, avg_score, best_score) = optimizer.run_generation(env, games_per_player, max_steps)
        avg_scores.append(avg_score)
        elapsed_time = time.time() - loop_start_time
        average_time_per_iteration = elapsed_time / (gen + 1)
        duration = str(timedelta(seconds=(average_time_per_iteration * (generations - gen + 1))))

        print(f"⏳ {duration} | Generation {gen+1}/{generations} - Average Score: {avg_score} - Best Score: {best_score} ")
        # Save best network every 10 generations
        if (gen % 10 == 0) or (gen == generations - 1):
            print(f"Saving population at generation {gen+1}")
            save_population(population, f'{folder}/population_gen_{gen+1}.pkl')

    
    # evaluated = optimizer.evaluate(env, 10)
    # best_network, best_score = max(evaluated, key=lambda x: x[1])

    # Plot average scores over generations
    plt.plot(range(1, generations + 1), avg_scores)
    plt.xlabel('Generation')
    plt.ylabel('Average Score')
    plt.title('Evolution of Average Score over Generations')
    plt.show()

if __name__ == "__main__":
    main()

=== Generation 1/5000 ===


In [None]:
population = load_population('networks/512_512/population_gen_991.pkl')

best_block = 0
best_score = 0

games_per_player = 10

for i, net in enumerate(population):
    player = Player(net)

    for _ in range(games_per_player):
        env = Game2048Env()
        while True:
            board = env.board
            # print_board(board)
            action = player.next_move(board)
            prev_state = state.copy()
            state, reward, done, _ = env.step(action)
            # print(f"Action: {action.name}, Best: {np.max(state)}")
            if (prev_state == state).all():
                # print("nop")
                # print_board(prev_state)
                # print_board(state)
                break
            if done:
                # print("Game Over")
                # print_board(state)
                break

        best_block = max(best_block, np.max(state))
        best_score = max(best_score, env.score)
        print(f"Player {i+1}/{len(population)} - Best Tile: {best_block} | Score: {env.score} | Max Score: {best_score}")
        print("===================================")    

    #time.sleep(.2)  # Pause for a second to visualize



Player 1/100 - Best Tile: 16 | Score: 547 | Max Score: 547
Player 1/100 - Best Tile: 32 | Score: 559 | Max Score: 559
Player 1/100 - Best Tile: 32 | Score: 560 | Max Score: 560
Player 1/100 - Best Tile: 32 | Score: 506 | Max Score: 560
Player 1/100 - Best Tile: 32 | Score: 356 | Max Score: 560
Player 1/100 - Best Tile: 32 | Score: 565 | Max Score: 565
Player 1/100 - Best Tile: 32 | Score: 589 | Max Score: 589
Player 1/100 - Best Tile: 32 | Score: 853 | Max Score: 853
Player 1/100 - Best Tile: 32 | Score: 598 | Max Score: 853
Player 1/100 - Best Tile: 32 | Score: 290 | Max Score: 853
Player 2/100 - Best Tile: 32 | Score: 775 | Max Score: 853
Player 2/100 - Best Tile: 32 | Score: 470 | Max Score: 853
Player 2/100 - Best Tile: 32 | Score: 757 | Max Score: 853
Player 2/100 - Best Tile: 32 | Score: 300 | Max Score: 853
Player 2/100 - Best Tile: 32 | Score: 659 | Max Score: 853
Player 2/100 - Best Tile: 32 | Score: 974 | Max Score: 974
Player 2/100 - Best Tile: 32 | Score: 685 | Max Score: 9