In [None]:
import random
import numpy as np
import pandas as pd
from collections import deque
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import copy
from io import BytesIO
from IPython.display import Image, display_png, clear_output
import matplotlib.pyplot as plt
from jupyterplot import ProgressPlot

In [None]:
import snake

class Game:
    def __init__(self, width, height):
        self.game = snake.PyGame(width, height)
        self.game.tick()
        
    def action_size(self):
        return 3
    
    def state_shape(self):
        w, h = self.game.size()
        return (-1, w * h)
        
    def state(self):
        state = np.array(self.game.state_model())
        return state.reshape(self.state_shape()).astype(np.float32)

    def step(self, action):
        self.game.input_turn(action)
        
        score_t0 = self.game.score()
        self.game.tick()

        reward = (self.game.score() - score_t0)# * 1.01 - 0.01
        done = self.game.done()

        if done:
            reward = -1

        return self.state(), reward, done

    def reset(self):
        self.game.reset()
        self.game.tick()
        return self.state()
    
    def score(self):
        return self.game.score()
    
    def draw(self, clear=True):
        colors = {0: 'grey', 1: 'blue', 2: 'white', 3: 'red'}
        if clear:
            clear_output(wait=True)
        state = np.array(self.game.state()).reshape(self.game.size()).astype(int)
        state = pd.DataFrame(state)
        display(state.style.applymap(lambda v: 'background-color: %s' % colors[v]))

game = Game(9, 9)
game.state_shape()

In [None]:
from collections import OrderedDict

class NeuralNetwork(nn.Module):
    def __init__(self, input_shape, output_size, hidden_layer_size = 256, hidden_layers = 3):
        super(NeuralNetwork, self).__init__()
        
        self.input_shape = input_shape
        self.input_size = input_shape[1]
        self.output_size = output_size
        
        n = hidden_layer_size
        
        od = OrderedDict()
        for i in range(hidden_layers):
            od[f'linear_{i}'] = nn.Linear(self.input_size if i == 0 else n, n)
            od[f'relu_{i}'] = nn.ReLU()
        
        self.relu_stack = nn.Sequential(od)
        self.output = nn.Linear(n, self.output_size)

        # loss function, optimizer
        self.loss_fn = nn.SmoothL1Loss()
        self.optimizer = torch.optim.SGD(
            self.parameters(), 
            lr=0.00001,
        )

    def forward(self, x):
        x = self.relu_stack(x)
        x = self.output(x)
        return x
    
    def predict(self, x):
        self.eval()
        with torch.no_grad():
            x = torch.from_numpy(x)
            pred = self(x)
            return pred
        
    def batch_train(self, x, y, batch_size=64, epochs=1):
        # Create data loaders.
        x = torch.Tensor(x)
        y = torch.Tensor(y)
        ds = TensorDataset(x,y)
        dataloader = DataLoader(ds, batch_size=batch_size, shuffle=True)

        size = len(dataloader.dataset)
        self.train()
        for _ in range(epochs):
            for batch, (X, y) in enumerate(dataloader):
                # Compute prediction error
                pred = self(X)
                loss = self.loss_fn(pred, y)

                # Backpropagation
                self.optimizer.zero_grad()
                loss.backward()

                # clip gradients
                for param in self.parameters():
                    param.grad.data.clamp_(-1, 1)

                self.optimizer.step()
                
NeuralNetwork((1,2,9,9), 3, 64)

In [None]:
class DQNAgent:
    def __init__(self, model: NeuralNetwork, epsilon=0.99):
        self.state_size = model.input_size
        self.action_size = model.output_size
        self.state_shape = model.input_shape
        
        self.memory = pd.DataFrame()
        self.gamma = 0.99   # discount rate
        self.epsilon = epsilon
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.997
        
        # create prediction and target networks
        self.model = model
        self.target_model = copy.deepcopy(model)
    
    def memory_stats(self):
        return {
            'mean_difficulty': self.memory['difficulty'].mean(),
            'median_difficulty': self.memory['difficulty'].median(),
            'mean_ticks': self.memory['ticks'].mean(),
        }

    def remember(self, arr):
        newmem = pd.DataFrame(arr, columns=['state', 'next_state', 'action','reward','done','difficulty','ticks'])
        self.memory = pd.concat([self.memory, newmem], ignore_index=True)
        self.memory = self.memory.tail(20_000)

    def act(self, state, explore=True):
        if explore and np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)#.cpu().numpy()
        return np.argmax(act_values[0])  # returns action

    def replay(self, batch):
        if len(self.memory) >= batch:
            mb = self.memory.sample(batch).copy().reset_index(drop=True)

            # get state, next_state as numpy arr
            state = np.vstack(mb['state']).reshape(self.state_shape)
            next_state = np.vstack(mb['next_state']).reshape(self.state_shape)

            # expected reward
            with torch.no_grad():
                pred = self.target_model.predict(next_state).numpy()
            mb['target'] = mb['reward'] + self.gamma * np.amax(pred, axis=1)

            # if done, target is direct reward
            is_done = (mb.done == True)
            mb.loc[is_done, 'target'] = mb[is_done].reward

            q = pd.DataFrame(self.model.predict(state))

            # replace with mb.target, to corresponding action col
            for action in q.columns:
                idx = (mb.action == action)
                q.loc[idx, action] = mb[idx]['target']

            # train x,y datasets
            X = state
            y = q.to_numpy()

            self.model.batch_train(X, y, epochs=1, batch_size=256)

            if (self.epsilon > self.epsilon_min):
                self.epsilon *= self.epsilon_decay
    
    # updates target network weights
    def update_weights(self):
        self.target_model.load_state_dict(self.model.state_dict())
    
    def load(self, path):
        store = torch.load(path)
        self.model.load_state_dict(store['model_state'])
        self.model.eval()
        self.update_weights()
        return store['score']

    def save(self, path, score):
        torch.save({
            'model_state': self.model.state_dict(), 
            'score': score,
        }, path)

agent = DQNAgent(
    NeuralNetwork(game.state_shape(), game.action_size(), 64),
    epsilon=0.99
)
print("created agent")

In [None]:
class Trainer:
    def __init__(self, env, 
                 mem_batch=1000,
                 moving_scores_n = 10, 
                 episode_max_len = 500,
                 step_episode_count = 10, 
                 epsilon=0.99,
                 hidden_layers=3,
                 hidden_layer_size=64,
                 load=False,
                ):
        self.name = f"nn_{hidden_layers}x{hidden_layer_size}_m{mem_batch}"
        
        self.env = env
        self.mem_batch = mem_batch
        self.moving_scores = deque(maxlen=moving_scores_n)
        self.episode_max_len = episode_max_len
        self.step_episode_count = step_episode_count
        self.best_score = 0
        
        # create our agent
        self.agent = DQNAgent(
            NeuralNetwork(env.state_shape(), env.action_size(), hidden_layers=hidden_layers, hidden_layer_size=hidden_layer_size),
            epsilon=epsilon
        )
        
        if load:
            self.best_score = self.agent.load(f'{self.name}.pk')
    
    def epsilon(self):
        return self.agent.epsilon
    
    def step(self, visual=False):
        emem = deque(maxlen=self.episode_max_len)
        scores = []

        for i in range(self.step_episode_count):
            state = self.env.reset()
            score = 0
            for _ in range(self.episode_max_len):
                action = self.agent.act(state)
                next_state, reward, done = self.env.step(action)
                
                # introduce difficulty metric to keep track on what is the difficulty in the memory
                difficulty = self.env.score()
                ticks = self.env.game.tick_count()

                # create flat array of variables from this step and add them to episode memory
                emem.append([[state.reshape(-1)], [next_state.reshape(-1)], action, reward,  done, difficulty, ticks])
                score += reward
                state = next_state

                if visual:
                    self.env.draw()

                if done:
                    break

            scores.append(score)

        # give episode memory list to agent
        self.agent.remember(emem)

        score_median = np.median(scores)
        self.moving_scores.append(score_median)
        moving_score = np.mean(self.moving_scores)

        if score_median > self.best_score:
            self.best_score = score_median
            self.agent.save(f'{self.name}.pk', self.best_score)

        if e % 20 == 0:
            self.agent.update_weights()
        
        # train the agent
        self.agent.replay(self.mem_batch)
            
        return score_median, moving_score

In [None]:
# create bunch of different agents and train them
EPISODES = 3000

trainers = [
    Trainer(
        Game(9, 9),
        hidden_layer_size=512,
        hidden_layers=3,
        mem_batch=4096,
        load=True,
        epsilon=0.1
    ),
]

print('Best scores:')
[(t.name, t.best_score) for t in trainers]

In [None]:
pp = ProgressPlot(
    plot_names=["epsilon", "score", "moving average", "best score","mean difficulty", "mean ticks"],
    line_names=[t.name for t in trainers],
    width=2000,
    height=1000,
)

plot_log = []

for e in range(EPISODES):
    eps = [t.epsilon() for t in trainers]
    
    step_info = [t.step() for t in trainers]
    
    score = [s[0] for s in step_info]
    moving_avg = [s[1] for s in step_info]
    
    best = [t.best_score for t in trainers]
    
    stats = [t.agent.memory_stats() for t in trainers]
    diff = [s['mean_difficulty'] for s in stats]
    ticks = [s['mean_ticks'] for s in stats]
    
    plot_log.append(list(np.dstack((eps, score, moving_avg, best, diff, ticks))))
    
    pp.update([eps, score, moving_avg, best, diff, ticks])

pp.finalize()

In [None]:
import time

while True:
    done = False
    state = game.reset()
    while not done:
        action = agent.act(state, False)
        state, _, done = game.step(action)
        game.draw()รง
        time.sleep(0.05)
    print(game.score())
    time.sleep(1)

In [None]:
# save to onnx

t0 = Trainer(
        Game(9, 9),
        hidden_layer_size=512,
        hidden_layers=3,
        mem_batch=4096,
        load=True,
        epsilon=0.1
)

agent.model.eval()
torch.onnx.export(
    t0.agent.model, 
    torch.randn(1, 81), 
    "snake.onnx",
    export_params=True,        # store the trained parameter weights inside the model file
)

In [None]:
game.state().shape