This notebook serves to test generations of cellular automata of different forms, presently focusing on 1 dimensional automata.

This builds on the 'Ceuular Automata Local Host' file. The intention is to set it up to train on 'infinte data' (i.e. generate the data as training is performed) with the statistical changes required to see grokking.

Perhaps this can be used for data compression? Train a network to see patterns which can be generated by simple programmes, and then a separate programme to correct that output with the true data in as little space as possible (i.e. turn lossy into lossless compression). 

In [None]:
# Importing necessary libraries

!pip install cellpylib

import cellpylib as cpl
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from torch import Tensor
import seaborn as sns
import time

In [170]:
# Set Parameters

# Data generation parameters
data_size = 100 # the number of data points in each row of data
#programmes_considered = np.arange(0,256,1) # the set of programmes being considered. For the 1D case it makes sense to consider all 0 to 255 programmes.
#number_of_samples = 2000 # the number of random times the output of a programme will be calculated, given random inputs
timesteps = 100 # the number of timesteps which each programme is run for before the output is used to train the model

# Training parameters
num_epochs = 2000  # Number of training epochs
hidden_size = 256  # Update with the desired size of the hidden layer
learning_rate = 0.001 # learning rate used later in the optimizer
batch_size = 32 # Batch size used when creating the train and test datasets. Note that 5 is likely much too low, and 32 would be more suitable for this problem.
train_ratio = 0.99 # Specifies how much of the set will be used to training vs testing

In [171]:
# Programme distribution

programmes_prob_distribution = []
for i in range(256):
    programmes_prob_distribution.append((i+10)**(-1))
programmes_prob_distribution = np.array(programmes_prob_distribution) 
# Note that this distribution will be normalised inside the data pre-processing step if not already normalised here

In [172]:
# Model Initialisation / Training setup

class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.bn1 = nn.BatchNorm1d(hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)
        self.bn2 = nn.BatchNorm1d(num_classes)

    def forward(self, x):
        out = self.fc1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.bn2(out)
        return out

# Define the input size, hidden size, and number of classes
input_size = data_size  # Update with the actual input size
#hidden_size = 64  # Update with the desired size of the hidden layer
#num_classes = len(programmes_considered)+1  # Number of potential classes
num_classes = 256 #Number of potential classes, here stuck at 256

# Create an instance of the neural network
model = NeuralNetwork(input_size, hidden_size, num_classes)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate)

In [173]:
# Data generation functions (where the programmes_considered have a probability distribution)

def create_data(data_size, programmes_prob_distribution, number_of_samples, timesteps):

    # Creating the dataset and labels variables to be populated later
    dataset = np.empty(shape=(number_of_samples, data_size), dtype=int) # each row is data_size length, with number_of_samples rows
    labels = np.empty(shape=(1, number_of_samples), dtype=int)

    # Stating the space of considered programmes
    programmes = np.arange(0,256,1)

    # Normalising the distribution in case it is not already normalised
    programmes_total = sum(programmes_prob_distribution)
    programmes_prob_distribution_norm = [x / programmes_total for x in programmes_prob_distribution]
    
    for i in range(number_of_samples):

        # Randomly selecting a rule number according to the probability distribution given
        rule_number = np.random.choice(a = programmes, size=None, replace=True, p = programmes_prob_distribution_norm)
        #print(f"Considering rule_number = ", rule_number)
        cellular_automaton = cpl.init_random(data_size)
        cellular_automaton = cpl.evolve(cellular_automaton, timesteps=timesteps, memoize=True, apply_rule=lambda n, c, t: cpl.nks_rule(n, rule_number))
        #print(cellular_automaton[-1])
        dataset[i] = cellular_automaton[-1]
        labels[:,i] = rule_number

    return [dataset, labels]


def data_split(data, train_ratio):

    np.random.shuffle(data) #randomly select parts of the dataset
    #train_ratio = train_ratio # this reserves 80% for training, 20% for testing
    split_index = int(len(data) * train_ratio)
    
    train_data = data[:split_index]
    test_data = data[split_index:]
    #print(f"train_data = ", train_data)
    #print(f"test_data = ", test_data)
    
    # Separate the dataset and labels from the training and testing sets
    train_dataset, train_labels = zip(*train_data)
    test_dataset, test_labels = zip(*test_data)
    
    data_split = [train_dataset, train_labels, test_dataset, test_labels]
    return data_split

def data_loader(data_size, programmes_prob_distribution, number_of_samples, timesteps, train_ratio):

    # Generate the data according to input parameters
    [dataset, labels] = create_data(data_size, programmes_prob_distribution, number_of_samples, timesteps)
    labels = labels[0] # Deal with the fact that the output is a list of a single list

    # Shifting the labels such that they are indexed from 0. Required for cross entropy to work
    #labels = [x - min(labels) for x in labels] #!!! Not currently shifting labels in a test to alter them later - may help with training in smaller batches
    # Use data_split
    data = [(data_sample, label) for data_sample, label in zip(dataset, labels)]
    [train_dataset, train_labels, test_dataset, test_labels] = data_split(data, train_ratio)

    tensor_train_dataset = TensorDataset(Tensor(train_dataset), Tensor(train_labels))
    tensor_test_dataset = TensorDataset(Tensor(test_dataset), Tensor(test_labels))
    
    train_loader = DataLoader(tensor_train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(tensor_test_dataset, batch_size=batch_size, shuffle=True)

    return [train_loader, test_loader]

In [174]:
# Create the data only for a specified programme. Returns only a single vector

def create_data_single(data_size, rule_number, number_of_samples, timesteps): #here, the rule_number parameter is the programme being considered

    dataset = np.empty(shape=(number_of_samples, data_size), dtype=int) # each row is data_size length, with number_of_samples rows
    labels = np.empty(shape=(1, number_of_samples), dtype=int)
    
    for i in range(number_of_samples):
        cellular_automaton = cpl.init_random(data_size)
        cellular_automaton = cpl.evolve(cellular_automaton, timesteps=timesteps, memoize=True, apply_rule=lambda n, c, t: cpl.nks_rule(n, rule_number))
        #print(cellular_automaton[-1])
        dataset[i] = cellular_automaton[-1]
        labels[:,i] = rule_number

    return [dataset, labels]

In [175]:
# Training loop (includes data generation). Note that here training and test loss cease to make much sense

def main_train(data_size, programmes_prob_distribution, batch_size, timesteps, train_ratio, num_epochs):

    # Initisalise training and test loss tracking variables
    training_loss = np.empty(num_epochs)

    # State which programmes are being considered. In this case, it's all of them.
    programmes_considered = np.arange(0,256,1)

    # Initialise an array to track not only the general training and test loss, but also the accuracy on individual programme classification during training.
    # This is to attempt to see grokking.
    # Form: Each row of loss_array is an epoch, each column of loss_array is a binary 1 or 0 based on whether or not it was correctly classified. 
    # Average over this later
    #loss_array = np.empty
    
    # Each epoch here trains over 1 batch size of data (which at the moment is 32). Each epoch is therefore smaller and better controlled.
    for epoch in range(num_epochs):
        [train_loader, test_loader] = data_loader(data_size, programmes_prob_distribution, batch_size, timesteps, train_ratio)
        for data, labels in train_loader:
            # Forward pass
            outputs = model(data)
    
            #Shifting labels for loss calculation
            shifted_labels = labels - torch.min(labels)
            shifted_labels = shifted_labels.long()
            loss = criterion(outputs, shifted_labels)
                
            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
        # Print the loss after each epoch
        #if epoch%10==0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}")
        training_loss[epoch] = loss.item()

    return [training_loss, test_loss]

In [None]:
# Produce lineplot of data with MatPlotLib
#num_epochs = 10000
epochs = np.arange(0,num_epochs, 1)

parameter_number = sum(p.numel() for p in model.parameters())

#plt.plot(epochs, training_loss, test_loss)
plt.plot(epochs, training_loss)
plt.xlabel("Epoch")
plt.ylabel("Cross Entropy Loss")
plt.title("Loss during training, " + str(number_of_samples) + " samples, " + str(data_size) + " width per entry, " + str(parameter_number) + " parameters")
plt.ylim(bottom = 0)
#plt.legend(["Training Loss", "Test Loss"])
plt.show()

In [None]:
# Produce a Seaborn plot (taking a moving average, and adding standard deviation information)

# Redefining epochs here, in case it is not carried over from previous cells. Likely unneccesary
epochs = np.arange(0, num_epochs, 1)

parameter_number = sum(p.numel() for p in model.parameters())

# Taking values from nearby epochs and averaging
moving_avg = 5 # The size of the averaging window being used.
reshaped_training_loss = np.reshape(training_loss, (-1, moving_avg)) #note that if the length of training_loss is not divisible by 10, the final elements are ignored
reshaped_test_loss = np.reshape(test_loss, (-1, moving_avg))
reshaped_epochs = np.reshape(epochs, (-1, moving_avg))
filtered_epochs = reshaped_epochs[:,0]
repeated_filtered_epochs = np.repeat(filtered_epochs, moving_avg)

pandas_df = pd.DataFrame({'Training Loss': reshaped_training_loss.flatten(), 'Test Loss': reshaped_test_loss.flatten(), 'Epochs': repeated_filtered_epochs})
pandas_df_melted = pd.melt(pandas_df, id_vars = 'Epochs', value_vars = ['Training Loss', 'Test Loss'], var_name='line', value_name = 'Values')


sns.lineplot(data=pandas_df_melted, x='Epochs', y='Values', hue='line')
plt.xlabel('Epochs')
plt.ylabel('Cross Entropy Loss')
plt.title("Loss during training, " + str(number_of_samples) + " samples, " + str(data_size) + " width per entry, " + str(parameter_number) + " parameters")
plt.legend(loc='best')
plt.rcParams['figure.dpi'] = 300
plt.show()

In [177]:
# Call the training loop
#num_epochs = 1000
#batch_size = 512
[training_loss, test_loss] = main_train(data_size, programmes_prob_distribution, batch_size, timesteps, train_ratio, num_epochs)

Epoch [1/2000], Loss: 6.035799026489258
Epoch [2/2000], Loss: 6.107973098754883
Epoch [3/2000], Loss: 6.181026458740234
Epoch [4/2000], Loss: 5.949923992156982
Epoch [5/2000], Loss: 6.057840824127197
Epoch [6/2000], Loss: 6.040886878967285
Epoch [7/2000], Loss: 5.961490154266357
Epoch [8/2000], Loss: 5.804179668426514
Epoch [9/2000], Loss: 5.974383354187012
Epoch [10/2000], Loss: 6.325131893157959
Epoch [11/2000], Loss: 6.1790666580200195
Epoch [12/2000], Loss: 6.044686794281006
Epoch [13/2000], Loss: 5.8875732421875
Epoch [14/2000], Loss: 5.920075416564941
Epoch [15/2000], Loss: 5.8804612159729
Epoch [16/2000], Loss: 6.161624431610107
Epoch [17/2000], Loss: 5.791683197021484
Epoch [18/2000], Loss: 6.111616611480713
Epoch [19/2000], Loss: 5.610601425170898
Epoch [20/2000], Loss: 5.720091819763184
Epoch [21/2000], Loss: 5.876699447631836
Epoch [22/2000], Loss: 5.9889397621154785
Epoch [23/2000], Loss: 6.122483730316162
Epoch [24/2000], Loss: 5.849305152893066
Epoch [25/2000], Loss: 6.01

In [None]:
# Calculate accuracy of a model 
#number_of_samples = 2000
def model_accuracy(model, data_size, programmes_prob_distribution, number_of_samples, timesteps):

    # Note that given this infinite data training regime, new data must be generated to perform this evaluation meaningfully.
    [train_loader, test_loader] = data_loader(data_size, programmes_prob_distribution, number_of_samples, timesteps, 0.8)
    # Evaluation on the training dataset (relevant for overparametrisation or otherwise deep learning systems)
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for data, labels in train_loader:
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
        accuracy = 100 * correct / total #returns the accuracy as a percentage
        
    return accuracy    

In [None]:
# Print the known accuracy

accuracy = model_accuracy(model, data_size, programmes_prob_distribution, number_of_samples, timesteps)
print(f"Accuracy on the generated set: {accuracy}%")

# Note form work below

-------------------------------------------------

In [None]:
# Example of how to use the programmes_prob_distribution variable

programmes_prob_distribution = [100,5,5,2,3,5,7,3,1,0]
programmes_prob_distribution_norm = [x / sum(programmes_prob_distribution) for x in programmes_prob_distribution]
programmes = np.arange(0, len(programmes_prob_distribution), 1)
for i in range(100):
    rule_number = np.random.choice(programmes, size=None, replace=True, p = programmes_prob_distribution_norm)
    print(f"Instance " + str(i) + ": rule_number = ", rule_number)

In [None]:
programmes_prob_distribution = []
for i in range(256):
    programmes_prob_distribution.append(np.log(2 + i))
programmes_prob_distribution = np.array(programmes_prob_distribution)
programmes = np.arange(0,256,1)
#print(programmes.shape)

#print(programmes_prob_distribution.shape)

print(data_loader(100, programmes_prob_distribution, 50, 100, 0.8))

In [None]:
# Simple training test  (on a single programme)

programmes_prob_distribution = [0] * 256
programmes_prob_distribution[0] = 1

num_epochs = 1000
batch_size = 256
main_train(data_size, programmes_prob_distribution, batch_size, timesteps, train_ratio, num_epochs)

In [None]:
[dataset, labels] = create_data_single(data_size, 150, 1, timesteps)

print(dataset)