In [1]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import torch
import torch.nn as nn
import torch.nn.functional as F
import time
from torch.utils.data import TensorDataset, DataLoader

from GameOfLife import StandardEngine
from GOLCNN import OPNet
from MinimalSolution import MinNet

device = "cuda"

In [2]:
def generateDataset(dataSetSize=1000, size=32, n_steps=2, returnTensor=False):
    init_state = np.zeros((32,32))
    engine = StandardEngine(init_state)
    
    inputs = []
    outputs = []
    
    for i in range(dataSetSize):
        start_state = np.random.randint(0,2,(size,size))
        inputs.append(start_state)
        
        engine.game_state = start_state
        engine.step_n(n_steps)
        
        output_state = engine.game_state
        outputs.append(output_state)
    
    inputs = np.array(inputs)
    inputs = inputs.reshape((dataSetSize, 1, size, size))
    outputs = np.array(outputs)
    ouputs = outputs.reshape((dataSetSize, 1, size, size))
    
    tensor_x = torch.Tensor(inputs)
    tensor_y = torch.Tensor(outputs)
    
    dataSet = TensorDataset(tensor_x, tensor_y)
    if returnTensor:
        return dataSet
    
    dataLoader = DataLoader(dataSet)
    return dataLoader

In [3]:
def train_epoch(model, opt, criterion, trainloader, filters, device="cuda", batch_size=64):
    model.train()
    losses = []
    
    for x_batch, y_batch in trainloader:
        x_batch = x_batch.repeat(1,filters,1,1)
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        y_batch = y_batch[None, :]
        
        opt.zero_grad()
        y_hat = model(x_batch)
        loss = criterion(y_hat, y_batch)
        loss.backward()
        opt.step()
        losses.append(loss.cpu().data.numpy())
    
    return np.mean(losses)

In [4]:
def test_model(model, test_loader, filters, criterion, device="cuda"):
    model.eval()
    size = len(dataloader.dataset)
    losses = []
    num_correct = 0
    
    for x_batch, y_batch in test_loader:
        x_batch = x_batch.repeat(1,filters,1,1)
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        
        batch_outputs = model(x_batch)
        loss = criterion(batch_outputs[0], y_batch)
        
        batch_preds = batch_outputs[0][0].cpu().detach().numpy()
        batch_preds = batch_preds.round()
        y_batch = y_batch[0].cpu().detach().numpy()
        
        num_correct += np.all(np.equal(batch_preds, y_batch))
        losses.append(loss.cpu().data.numpy())
    
    avg_loss = np.mean(losses)
    acc = num_correct / size
    num_wrong = size - num_correct
    
    return acc, loss, num_correct, num_wrong

In [5]:
# Ensure test_model() works on the minimal solution CNN
dataset_size = 1000
dataloader = generateDataset(dataSetSize=dataset_size, size=32, n_steps=3)
min_model = MinNet(3)
min_model.to(device)
criterion = nn.MSELoss()
acc, epoch_test_loss, num_correct, num_wrong = test_model(min_model, dataloader, 1, criterion)
print(f'Accuracy: {acc}, Test Loss: {epoch_test_loss}, Correct: {num_correct}/{dataset_size}, Incorrect: {num_wrong}/{dataset_size}')

Accuracy: 1.0, Test Loss: 3.2734212768004914e-18, Correct: 1000/1000, Incorrect: 0/1000


In [6]:
# Training Parameters
learning_rate = 1e-3
batch_size = 100
epochs = 1000

m = 16 # Overparameterization Factor
n = 2  # Steps of GOL
model = OPNet(m, n)
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [7]:
loss = []

model.to(device)

for t in range(epochs):
    dataloader = generateDataset(dataSetSize=1000, size=32, n_steps=2)
    epoch_train_loss = train_epoch(model, optimizer, criterion, dataloader, m)
    
    if t % 100 == 0:
        acc, epoch_test_loss, num_correct, num_wrong = test_model(model, dataloader, m, criterion)
        loss.append(epoch_test_loss)
        print(f'Epoch: {t+1}/{epochs}, Test Loss: {epoch_test_loss}, Incorrect: {num_wrong}/1000 examples')
        
print("END OF ERA 1")
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate*0.1)

for t in range(epochs):
    dataloader = generateDataset(dataSetSize=1000, size=32, n_steps=2)
    epoch_train_loss = train_epoch(model, optimizer, criterion, dataloader, m)
    
    if t % 100 == 0:
        acc, epoch_test_loss, num_correct, num_wrong = test_model(model, dataloader, m, criterion)
        loss.append(epoch_test_loss)
        print(f'Epoch: {t+1}/{epochs}, Test Loss: {epoch_test_loss}, Incorrect: {num_wrong}/1000 examples')
        
print("END OF ERA 2")
print("DONE!")

Epoch: 1/1000, Test Loss: 0.24124956130981445, Incorrect: 1000/1000 examples
Epoch: 101/1000, Test Loss: 0.1871577501296997, Incorrect: 1000/1000 examples
Epoch: 201/1000, Test Loss: 0.18195250630378723, Incorrect: 1000/1000 examples
Epoch: 301/1000, Test Loss: 0.16713085770606995, Incorrect: 1000/1000 examples
Epoch: 401/1000, Test Loss: 0.13074202835559845, Incorrect: 1000/1000 examples
Epoch: 501/1000, Test Loss: 0.13080167770385742, Incorrect: 1000/1000 examples
Epoch: 601/1000, Test Loss: 0.10869193077087402, Incorrect: 1000/1000 examples
Epoch: 701/1000, Test Loss: 0.08300051093101501, Incorrect: 1000/1000 examples
Epoch: 801/1000, Test Loss: 0.040355950593948364, Incorrect: 1000/1000 examples
Epoch: 901/1000, Test Loss: 0.00741781760007143, Incorrect: 677/1000 examples
END OF ERA 1
Epoch: 1/1000, Test Loss: 0.0008659190498292446, Incorrect: 5/1000 examples
Epoch: 101/1000, Test Loss: 0.0006141755147837102, Incorrect: 5/1000 examples
Epoch: 201/1000, Test Loss: 0.0006999216857366

KeyboardInterrupt: 

In [8]:
torch.save(model, f'./models/op_m16_n2_model1.pt')