# Imports and Setup

In [None]:
# Some basic imports that we will need for the data
import numpy as np
import pandas as pd
import math

# some basic imports that we will need for ML
import torch
import torch.nn as nn 
import torch.nn.functional as F
import torch.optim as optim
import tensorflow as tf

In [None]:
# get the life step function that will be used to score submissions

def life_step(X):
    """Game of life step using generator expressions"""
    nbrs_count = sum(np.roll(np.roll(X, i, 0), j, 1)
                     for i in (-1, 0, 1) for j in (-1, 0, 1)
                     if (i != 0 or j != 0))
    return (nbrs_count == 3) | (X & (nbrs_count == 2))

In [None]:
# a simple display of a blinker, using the life step function

X = np.array([[0, 0, 0, 0, 0],[0, 0, 0, 0, 0], [0, 1, 1, 1, 0], [0, 0, 0, 0, 0],[0, 0, 0, 0, 0],])
print(X)

for i in range(3):
  X = life_step(X)
  print(X)

In [None]:
# let's get our data in a useable format
train_file = pd.read_csv('/kaggle/input/conways-reverse-game-of-life-2020/train.csv')
test_file = pd.read_csv('/kaggle/input/conways-reverse-game-of-life-2020/test.csv')

# we should have 50,000 training and test games
# each training game has an id, a delta (number of time steps), 
# 625 starting points, and 625 ending points, for a total of 1252 in each row
# the test data is the same, except without the starting points
# so there are 627 total elements in each row
print(train_file.shape)
print(test_file.shape)

In [None]:
# let's take a peak at our training data
train_file.iloc[0:4, [0, 1, 2, 3, 626, -624, -3, -2, -1]]

In [None]:
# and a quick peak at our test data
test_file.iloc[0:4, [0, 1, 2, 3, -3, -2, -1]]

In [None]:
# get just the data
# drop the id and delta
train_data = train_file.drop(["id", "delta"], axis = 1)

# drop the start game data
train_data.drop(train_data.columns[train_data.columns.str.startswith('start_')], axis = 1, inplace = True)

# turn it into an array of arrays
train_data = train_data.to_numpy()

# get the starts of the test data
train_starts = train_file.drop(["id", "delta"], axis = 1)
train_starts.drop(train_starts.columns[train_starts.columns.str.startswith('stop_')], axis = 1, inplace = True)
train_starts = train_starts.to_numpy()

# get the deltas of each
train_deltas = train_file[['delta']].to_numpy()

# do the same for the test data
test_data = test_file.drop(["id", "delta"], axis = 1)
test_data = test_data.to_numpy()

# make train sets and test sets
# 0 will be the deltas; the rest will be the data
test_sets = test_file.drop(["id"], axis = 1).to_numpy()
train_sets = train_file.drop(["id"], axis = 1).to_numpy() # 0 will be the deltas; the rest will be the data
# the training data will be shuffled for random sampling
np.random.shuffle(train_sets)

In [None]:
# example of unpacking data
delta = train_sets[0][0]
start = train_sets[0][1:626]
end = train_sets[0][626:]
print("delta: %d" % delta)
print("start shape: %d" % start.shape)
print("end shape: %d" % end.shape)

delta = test_sets[0][0]
end = test_sets[0][1:]
print("delta: %d" % delta)
print("end shape: %d" % end.shape)

In [None]:
# create a function to find if a starting setup is correct
# for a given ending setup

# This didn't get used in the end for two reason:
# first, it requires a gpu kernel that is currently under development. Rolls are currently converted to np arrays, which is a problem.
# second, I'm not confident enough in my ability to make use of gradients to modify them as I pass in tensors through this "layer"

def getError(startGame, endGame, numTimeSteps):
  dim = round(math.sqrt(len(startGame)))
  currentGame = startGame.reshape((dim, dim))
  endGame = endGame.reshape((dim, dim))

  for i in range(numTimeSteps):
    currentGame = life_step(currentGame)
  
  error = np.sum((currentGame - endGame) ** 2)
  
  return error


In [None]:
# test to make sure that my error function is happy with all of their
# training data
# the error should be 0 in the end
totalError = 0

for i in range(len(train_data)):
  startGame = train_starts[i]
  endGame = train_data[i]
  timeStep = train_deltas[i]
  totalError += getError(startGame, endGame, timeStep[0])

totalError

# Neural Network for the forward Direction

The game of life is a lot simpler forwards - afterall, it is deterministic!

Consider this kernel:

1 1 1

1 9 1

1 1 1

If the result is 3, 11, or 12, the cell in the next time step should be alive.
If the result is anything else, it should be dead.

3 Represents a dead cell, touching exaclty 3 living cells - bring it to life!
11 and 12 represent a living cell touching 2 (11) or 3 (12) other living cells,
so this cell should remain alive in the next time step.

So this simple neural network should handle one time step perfectly!

I did some reading here, though mine is *slightly* different. There are a number of ways to setup this convolution "pefectly":

https://medium.com/@tomgrek/evolving-game-of-life-neural-networks-chaos-and-complexity-94b509bc7aa8

The idea, if I can get it working, is to use this to make something similar to (though not quite the same as) an autoencoder. This will allow me to make a neural network that can predict the start state, then, using this neural network as essentially one of the layers, it will use that start state to generate an end state. Both of the prediction of the start state and the end state will both generate a loss that I can use to train my model.

While I was closer to getting this working than the above error function, I still couldn't get it working in the end. :( The reshaping, gradient handling, etc, was just beyond me.

In [None]:
# Let's setup a neural network on a cuda device
device = torch.device('cuda')

In [None]:
class forwardGame(nn.Module):
  def __init__(self):
    super().__init__()
    self.step = nn.Conv2d(in_channels=1, out_channels=1, kernel_size=3, padding=1, padding_mode='circular')
    # let's set that weight!
    # I've never done this before, but it should be similar to this:
    # https://discuss.pytorch.org/t/setting-custom-kernel-for-cnn-in-pytorch/27176/2
    with torch.no_grad():
      self.step.weight = nn.Parameter(torch.tensor([[1, 1, 1], [1, 9, 1], [1, 1, 1]]).type('torch.FloatTensor').view(1, 1, 3, 3).repeat(1, 1, 1, 1))
    
    # this layer doesn't need to change... so don't let it!
    for param in self.parameters():
      self.requires_grad = False
  
  def forward(self, x, delta):
    
    # make one step for each number in the series
    for i in range(delta):
      # round, since we won't always be dealing with pretty numbers
      # e.g. when we reconstruct, they will a variety of numbers
      x = self.step(x).round()
      # this is the magic; described in text above
      # turn it from bool to int to float, so it can be convoluted again
      x = (((x == 3) | (x == 11) | (x == 12)).int()).type('torch.FloatTensor')
    
    return x

In [None]:
def prepForConv(x):
  dim = round(math.sqrt(len(x)))
  return torch.from_numpy(x).type('torch.FloatTensor').reshape((dim, dim))[None, None, ...]

In [None]:
# let's give it a whirl!
forwardCNN = forwardGame()

# this is our blinker from before. It should alternate between a vertical and horizontal line
start = prepForConv(np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))

print(start)
for i in range(3):
  start = forwardCNN(start, 1)
  print(start)

# Neural Network



In [None]:
'''
# first, not so successful attempt
model = nn.Sequential(
    nn.Conv2d(in_channels=1, out_channels=1, kernel_size=3, padding_mode = 'circular'),
    nn.ReLU(),
    nn.Conv2d(in_channels=1, out_channels=1, kernel_size=3, padding_mode = 'circular'),
    nn.ReLU(),
    nn.Flatten(),
    nn.Linear(in_features = (441), out_features = 625),
    nn.ReLU(),
    nn.Linear(in_features = 625, out_features = 625),
    nn.ReLU(),
    nn.Linear(in_features = 625, out_features = 625),
    nn.ReLU(),
    nn.Linear(in_features = 625, out_features = 625),
    
).to(device)
'''

class oneBack(nn.Module):
  # I decided to use 512 channels this time, like the VGG16 model
  # it starts with 256 channels, and ends with it, so it can be repeated
  def __init__(self, channels = 256, mid_channels = 512):
    super().__init__()
    # 4 padding should keep each layer the same size
    # padding is circular because the board wraps around by problem definition
    self.convIn = nn.Conv2d(channels, mid_channels, 5, padding=2, padding_mode = 'circular')
    self.convMid = nn.Conv2d(mid_channels, mid_channels, 5, padding=2, padding_mode = 'circular')
    self.convOut = nn.Conv2d(mid_channels, channels, 5, padding=2, padding_mode = 'circular')
    self.actv = nn.ReLU()
  
  def forward(self, x):
    x = self.actv(self.convIn(x))
    x = self.actv(self.convMid(x))
    x = self.actv(self.convOut(x))
    return x

class reverseTime(nn.Module):
  def __init__(self, channels = 256, mid_channels = 512):
    super().__init__()
    self.actv = nn.ReLU()
    self.convIn = nn.Conv2d(1, channels, 5, padding=2, padding_mode = 'circular')
    self.oneBack = oneBack(channels, mid_channels)
    self.convOut = nn.Conv2d(channels, 1, 5, padding=2, padding_mode = 'circular')
    self.toLinear = nn.Flatten()
    self.fc = nn.Linear(in_features = 625, out_features = 625)
    self.toStart = nn.Linear(in_features = 625, out_features = 625)
    self.finalActv = nn.Sigmoid()
    self.rebuild = forwardGame()
  
  def forward(self, x, delta):
    # first, get x ready to reverse back one step at a time
    x = self.actv(self.convIn(x))

    # take it back, one time step at a time
    for i in range(delta):
      x = self.oneBack(x)
    
    # transform it back to one layer of 625
    x = self.actv(self.convOut(x))

    # flatten it to get the result
    x = self.actv(self.toLinear(x))
    
    x = self.actv(self.fc(x))

    # get start
    x = self.actv(self.toStart(x))

    #y = self.rebuild(x.reshape((25, 25))[None, None, ...].to(device), delta)

    return x #, self.toLinear(y)

model = reverseTime().to(device)

In [None]:
# use MSE Loss to evaluate how well it did overall
# first, failed attempt
#criterion = nn.MSELoss()

# binary logistic loss seems to make a lot more sense
# especially since we are dealing with 0's and 1's
criterion = nn.BCEWithLogitsLoss()

# hyperparameters

batch_size = 50
epochs = 100
learning_rate = 0.005

# set up the optimizer

optimizer = torch.optim.Adam(
    model.parameters(), lr=learning_rate, weight_decay=1e-5)

train_iterator = torch.utils.data.DataLoader(train_data, shuffle=True, batch_size=batch_size)

In [None]:
# setup a basic training function
def train(model, device, train_data, optimizer, criterion, ):
    
    epoch_loss = 0
    
    model.train()
    
    num_tests = 0
    
    losses = []
    
    for x in train_data:
        delta = x[0]
        x_start = x[1:626]
        x_end = x[626:]
        dim = round(math.sqrt(len(x_start)))
        x_end_t = torch.from_numpy(x_end).type('torch.FloatTensor').reshape((dim, dim))[None, None, ...].to(device)
        x_start_t = torch.from_numpy(x_start).type('torch.FloatTensor')[None, ...].to(device)
        
    
        optimizer.zero_grad()
        
        pred_start = model(x_end_t, delta)

        loss = criterion(pred_start, x_start_t)
        
        
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        losses.append(loss.item())
        num_tests += 1
        
    return epoch_loss / num_tests, losses

# setup a basic validation function
def evaluate(model, device, train_data, optimizer, criterion):
    
    epoch_loss = 0
    losses = []
    
    model.eval()
    
    num_tests = 0
    with torch.no_grad():
        for x in train_data:
            delta = x[0]
            x_start = x[1:626]
            x_end = x[626:]
            dim = round(math.sqrt(len(x_start)))
            x_end_t = torch.from_numpy(x_end).type('torch.FloatTensor').reshape((dim, dim))[None, None, ...].to(device)
            x_start_t = torch.from_numpy(x_start).type('torch.FloatTensor')[None, ...].to(device)

            pred_start = model(x_end_t, delta)

            loss = criterion(pred_start, x_start_t)
            
            epoch_loss += loss.item()
            losses.append(loss.item())
            num_tests += 1

    return epoch_loss / num_tests, losses

In [None]:
# set up a basic prediction function
# this is what will take the input, and give the final output to submit
def predict(model, device, val_data):
    
    results = []

    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
        for x in val_data:

            delta = x[0]
            curr_game = x[1:626]
            dim = round(math.sqrt(len(curr_game)))
            curr_game = torch.from_numpy(curr_game)
            
            curr_game = curr_game.type('torch.FloatTensor').reshape((dim, dim))[None, None, ...].to(device)
            curr_game = model(curr_game, delta).round()
            
            results.append(curr_game[0].detach().cpu().numpy())


    return results

# Train the model

In [None]:
# this is essentially the batch size
num_tensors_per_epoch = round(len(train_sets) / epochs)
num_tensors_for_valid = round(num_tensors_per_epoch * .2)

allLosses = []
allValLosses = []

for epoch in range(epochs):
  start = num_tensors_per_epoch * epoch
  end = num_tensors_per_epoch * (epoch + 1)
  train_loss, individualLosses = train(model, device, train_sets[start:(end-num_tensors_for_valid)], optimizer, criterion)
  val_loss, individualValLosses = evaluate(model, device, train_sets[(end - num_tensors_for_valid):end], optimizer, criterion)
  print(f'| Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f} | Validation Loss: {val_loss:.3f}')
  allLosses += individualLosses
  allValLosses += individualValLosses

In [None]:
print("First set, training loss:", allLosses[0])
print("First set, Val loss:", allValLosses[0])
print("All training loss: ", allLosses)
print("All validation loss: ", allValLosses)

In [None]:
# make the necessary predictions
predictions = predict(model, device, test_sets)

In [None]:
# convert to an np array (use this as data in df)
np_pred = np.array(predictions)
print(np_pred)

In [None]:
# convert to a df for easy submission
answer = pd.DataFrame(data = np_pred, columns = ["start_%d" % i for i in range(625)])

answer.insert(0, 'id', answer.index + 50000)

answer.to_csv('submission.csv', index=False)
answer