### 2048 GAME

In [1]:
import random
import numpy as np
from collections import deque
from keras.models import Sequential 
from keras.layers import Dense, Dropout, BatchNormalization
from keras.layers import Conv2D, Flatten
from keras.optimizers import Adam
from keras.regularizers import l2
from two_oh_four_eight import Game
import os

In [2]:
from tensorflow import keras

keras.utils.disable_interactive_logging()

MAX_LEN = 100000
env = Game(max_moves=100000, max_score=100000)

state_size = env.state_size
action_size = env.action_size

BATCH_SIZE = 1024
N_EPISODES = 10000

output_dir = "model_output"

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

state_size, action_size

(16, 4)

In [3]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=MAX_LEN)
        self.gamma = 0.95
        self.epsilon = 0.95
        self.epsilon_decay = 0.9999
        self.epsilon_min = 0.00
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Conv2D(filters=64, kernel_size=(2,2), activation='relu', input_shape=(4, 4, 13)))
        model.add(Conv2D(filters=128, kernel_size=(2,2), activation='relu'))
        model.add(Flatten())
        model.add(Dense(1024, activation="relu"))
        model.add(BatchNormalization())
        model.add(Dense(1024, activation="relu"))
        model.add(BatchNormalization())
        model.add(Dense(1024, activation="relu"))
        model.add(BatchNormalization())
        model.add(Dense(self.action_size, activation="linear"))
        model.compile(optimizer='adam', loss='mse')
        return model

    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        
    def train_long_memory(self):
        if len(self.memory) > BATCH_SIZE:
            mini_sample = random.sample(self.memory, BATCH_SIZE) # list of tuples
        else:
            mini_sample = self.memory
        states = np.array([np.squeeze(i[0]) for i in mini_sample])
        actions = np.array([i[1] for i in mini_sample])
        rewards = np.array([i[2] for i in mini_sample])
        next_states = np.array([np.squeeze(i[3]) for i in mini_sample])
        dones = np.array([i[4] for i in mini_sample])

        targets = rewards + self.gamma*(np.amax(self.model.predict_on_batch(next_states), axis=1))*(1-dones)
        targets_full = self.model.predict_on_batch(states)
        ind = np.array([i for i in range(min(BATCH_SIZE, len(self.memory)))])
        targets_full[[ind], [actions]] = targets
        self.model.fit(states, targets_full, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        

    def train_short_memory(self, state, action, reward, next_state, done):
        target = reward
        if not done:
            target = reward + self.gamma*(np.amax(self.model.predict(next_state.reshape((1, 4, 4, 13)))[0]))
        target_full = self.model.predict(state.reshape((1, 4, 4, 13)))
        target_full[0][action] = target
        self.model.fit(state.reshape((1, 4, 4, 13)), target_full, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


    def get_action(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            return np.argmax(self.model.predict(state))
            
    def save(self, name):
        self.model.save(f"{output_dir}/{name}")



In [4]:
from IPython.display import clear_output
   
agent = DQNAgent(state_size, action_size)
best_score = 0
results = []

def train():
    global best_score
    global results
    state = env.reset()
    done = False
    invalid_moves = 0
    has_invalid_already = False
    
    while not done: 
        clear_output(wait=True)
        env.render()

        for index, row in enumerate(results):
            print(f"Episode #{index} ended with score: {row[0]} and {row[1]} invalid moves, highest tile: {row[2]}")

        state = env.get_state()
        action = agent.get_action(state)
        reward, done, score = env.step(action)
        state_new = env.get_state()

        agent.train_short_memory(state, action, reward, state_new, done)

        print(f"Action: {action}, Reward: {reward}")

        if done:
            agent.train_long_memory()

        if np.array_equal(state, state_new):
            invalid_moves += 1
            if has_invalid_already:
                continue
            has_invalid_already = True
        else:
            has_invalid_already = False

        agent.remember(state, action, reward, state_new, done)

        
    print(f"Episode #{index_episode} ended with score: {env.score} and {invalid_moves} invalid moves\n")

    if env.score > best_score:
        best_score = env.score
        agent.save(f"2048_{best_score}_{invalid_moves}_{env.highest_tile}.keras")
    
    results.append((env.score, invalid_moves, env.highest_tile))


try:
    for index_episode in range(N_EPISODES):
        train()
finally:
    agent.save("2048_last.keras")



2 0 0 0 
2 0 0 2 
8 4 0 0 
2 8 4 0 
Episode #0 ended with score: 932 and 13 invalid moves, highest tile: 128
Episode #1 ended with score: 412 and 12 invalid moves, highest tile: 32
Episode #2 ended with score: 260 and 5 invalid moves, highest tile: 32
Episode #3 ended with score: 156 and 4 invalid moves, highest tile: 16
Episode #4 ended with score: 1968 and 22 invalid moves, highest tile: 128
Episode #5 ended with score: 588 and 19 invalid moves, highest tile: 64
Episode #6 ended with score: 604 and 4 invalid moves, highest tile: 64
Action: 0, Reward: -5.0
