In [32]:
import torch
from torch import nn
import torch.optim as optim
import numpy as np
import random

In [33]:
class NN2048(nn.Module):
    def __init__(self, input_size=16, filter1=512, filter2=4096, drop_prob=0.):
        super(NN2048, self).__init__()
        self.conv_a = nn.Conv2d(in_channels=input_size, out_channels=filter1, kernel_size=(2,1), padding=0)
        self.conv_b = nn.Conv2d(in_channels=input_size, out_channels=filter1, kernel_size=(1,2), padding=0)
        self.conv_aa = nn.Conv2d(in_channels=filter1, out_channels=filter2, kernel_size=(2,1), padding=0)
        self.conv_ab = nn.Conv2d(in_channels=filter1, out_channels=filter2, kernel_size=(1,2), padding=0)
        self.conv_ba = nn.Conv2d(in_channels=filter1, out_channels=filter2, kernel_size=(2,1), padding=0)
        self.conv_bb = nn.Conv2d(in_channels=filter1, out_channels=filter2, kernel_size=(1,2), padding=0)
        self.relu = nn.ReLU()
        self.W_aa = nn.Linear(filter2 * 8, 1)
        self.W_ab = nn.Linear(filter2 * 9, 1)
        self.W_ba = nn.Linear(filter2 * 9, 1)
        self.W_bb = nn.Linear(filter2 * 8, 1)

    def flatten(self, x):
        N = x.size()[0]
        return x.view(N, -1)
        
    def forward(self, x):
        x = x.float()
        a = self.relu(self.conv_a(x))
        b = self.relu(self.conv_b(x))
        aa = self.flatten(self.relu(self.conv_aa(a)))
        ab = self.flatten(self.relu(self.conv_ab(a)))
        ba = self.flatten(self.relu(self.conv_ba(b)))
        bb = self.flatten(self.relu(self.conv_bb(b)))
        out = self.W_aa(aa) + self.W_ab(ab) + self.W_ba(ba) + self.W_bb(bb)
        return out
        

In [34]:
singleScore=[0,0,4,16,48,128,320,768,1792,4096,9216,20480,45056,98304,212992,458752,983040]
moveDict=np.load('move.npy')

def add_two(mat):
    indexs=np.argwhere(mat==0)
    index=np.random.randint(0,len(indexs))
    mat[tuple(indexs[index])] = 1
    return mat

def game_state(mat):
    return 'not over' if np.any(mat == 0) or np.any(mat[:, 0:-1]==mat[:, 1:]) or np.any(mat[0:-1, :]==mat[1:, :]) else 'lose'

def move(list):
    return moveDict[list[0],list[1],list[2],list[3],:]

def lookup(x):
    return singleScore[x]

lookup = np.vectorize(lookup)

def getScore(matrix):
    return np.sum(lookup(matrix))

In [35]:
def getMove(grid):
    board_list = []
    for i in range(4):
        newGrid=moveGrid(grid, i)
        if not isSame(grid,newGrid):
            board_list.append((i, newGrid, getScore(newGrid)))
    return board_list
        
def moveGrid(grid,i):
    # new=np.zeros((4,4),dtype=np.int)
    new = None
    if i==0:
        # move up
        grid=np.transpose(grid)
        new = np.stack([move(grid[row,:]) for row in range(4)], axis = 0).astype(int).T
    elif i==1:
        # move left
        new = np.stack([move(grid[row,:]) for row in range(4)], axis = 0).astype(int)
    elif i==2:
        # move down
        grid=np.transpose(grid)
        new = np.stack([np.flip(move(np.flip(grid[row,:]))) for row in range(4)], axis = 0).astype(int).T
    elif i==3:
        # move right
        new = np.stack([np.flip(move(np.flip(grid[row,:]))) for row in range(4)], axis = 0).astype(int)
    return new

def isSame(grid1,grid2):
    return np.all(grid1==grid2)

In [36]:
def make_input(grid):
    r = np.zeros(shape=(16, 4, 4))
    for i in range(4):
        for j in range(4):
            r[grid[i, j], i, j]=1
    return r

def Vchange(grid, v):
    g0 = grid
    g1 = g0[:,::-1,:]
    g2 = g0[:,:,::-1]
    g3 = g2[:,::-1,:]
    r0 = grid.swapaxes(1,2)
    r1 = r0[:,::-1,:]
    r2 = r0[:,:,::-1]
    r3 = r2[:,::-1,:]
    xtrain = np.array([g0,g1,g2,g3,r0,r1,r2,r3])
    ytrain = np.array([v]*8) 
    return torch.from_numpy(xtrain), \
            torch.from_numpy(ytrain).float()#.cuda()

In [43]:
def gen_sample_and_learn(model, optimizer, loss_fn, is_train = False, explorationProb=0.2):
    model.eval()
    game_len = 0
    last_loss = 0
    last_grid = np.zeros((4,4),dtype=np.int)
    last_grid = add_two(last_grid)
    pre_score = 0
    
    while True:
        matrix = add_two(last_grid) #use last_grid before adding two??
        board_list = getMove(matrix)
        
        if len(board_list) == 0:
            assert(game_state(last_grid)=='lose')
            best_v = 0
            best_grid = None
        else:
            game_len += 1
            best_v = None
            boards = torch.from_numpy(np.array([make_input(g) for m, g, s in board_list], dtype=np.float))
            preds = model(boards)
            for (move, newGrid, score), pred  in zip(board_list, preds):
                v = score - pre_score + pred
#                 print ('score', score - pre_score)
                if best_v is None or v > best_v:
                    best_v = v
                    best_score = score
                    best_grid = newGrid
                    
        if is_train:
            x, y = Vchange(make_input(last_grid), best_v.item())
            model.train()
            optimizer.zero_grad()
            pred = model(x)
#             print (pred)
            loss = loss_fn(pred, y) / 2
            loss.backward()
            last_loss = loss.item()
            nn.utils.clip_grad_norm_(model.parameters(), 5.0)
            optimizer.step()
            model.eval()
        
        if best_grid is None:
            break
        
        if game_len % 20 == 0:
            print ('game length',game_len, last_loss, best_score, best_v)
        
        # gibbs sampling or espilon-greedy
        if is_train and random.random() < explorationProb:
            (_, last_grid, pre_score) = random.choice(board_list)
        else:
            last_grid = best_grid
            pre_score = best_score
            
    return game_len, last_loss, last_grid, pre_score

In [44]:
num_epochs = 200
lr = 1e-3
weight_decay = 1e-5


def train(model):
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay, betas=(0.5, 0.999))
    loss=nn.MSELoss()
    epoch = 0
    while epoch != num_epochs:
        epoch += 1
        game_len, last_loss, last_grid, pre_score = gen_sample_and_learn(model, optimizer, loss, True, 0)
        print ('epoch', epoch, game_len, last_loss, last_grid, pre_score)
        
def test(model):
    while epoch != num_epochs:
        epoch += 1
        game_len, last_loss, last_grid, pre_score = gen_sample_and_learn(model, optimizer, loss, False)
        print ('epoch', epoch, game_len, last_loss, last_grid, pre_score)
    
model = NN2048()
train(model)
            

tensor([0.0732], grad_fn=<AddBackward0>)
[[1 0 0 0]
 [0 0 0 0]
 [0 0 1 0]
 [0 0 0 0]]
tensor([12.6450], grad_fn=<AddBackward0>)
[[0 0 0 1]
 [0 0 0 0]
 [0 0 0 1]
 [0 0 1 0]]
tensor([23.7404], grad_fn=<AddBackward0>)
[[0 0 0 0]
 [0 0 0 0]
 [1 0 0 0]
 [0 0 1 2]]
tensor([55.1405], grad_fn=<AddBackward0>)
[[1 0 1 2]
 [0 0 0 0]
 [0 0 1 0]
 [0 0 0 0]]
tensor([101.8547], grad_fn=<AddBackward0>)
[[1 0 2 2]
 [0 0 0 0]
 [0 0 0 0]
 [0 0 1 0]]


KeyboardInterrupt: 