# Imports
All imports should be defined here to reduce clutter

In [12]:
import numpy as np
import pygame
import torch
import torchvision
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
import matplotlib.pyplot as plot

from NovelSwarmBehavior.novel_swarms.config.EvolutionaryConfig import GeneticEvolutionConfig
from NovelSwarmBehavior.novel_swarms.config.WorldConfig import RectangularWorldConfig
from NovelSwarmBehavior.novel_swarms.config.defaults import ConfigurationDefaults
from NovelSwarmBehavior.novel_swarms.novelty.GeneRule import GeneRule
from NovelSwarmBehavior.novel_swarms.config.OutputTensorConfig import OutputTensorConfig
from generation.HaltedEvolution import HaltedEvolution

# Function Declarations

In [13]:
def initializeHaltedEvolution():
    agent_config = ConfigurationDefaults.DIFF_DRIVE_AGENT

    genotype = [
        GeneRule(_max=1.0, _min=-1.0, mutation_step=0.4, round_digits=4),
        GeneRule(_max=1.0, _min=-1.0, mutation_step=0.4, round_digits=4),
        GeneRule(_max=1.0, _min=-1.0, mutation_step=0.4, round_digits=4),
        GeneRule(_max=1.0, _min=-1.0, mutation_step=0.4, round_digits=4),
    ]

    phenotype = ConfigurationDefaults.BEHAVIOR_VECTOR

    world_config = RectangularWorldConfig(
        size=(500, 500),
        n_agents=30,
        behavior=phenotype,
        agentConfig=agent_config,
        padding=15
    )

    novelty_config = GeneticEvolutionConfig(
        gene_rules=genotype,
        phenotype_config=phenotype,
        n_generations=100,
        n_population=100,
        crossover_rate=0.7,
        mutation_rate=0.15,
        world_config=world_config,
        k_nn=15,
        simulation_lifespan=600,
        display_novelty=False,
        save_archive=False,
        show_gui=True
    )

    pygame.init()
    pygame.display.set_caption("Evolutionary Novelty Search")
    screen = pygame.display.set_mode((world_config.w, world_config.h))

    output_config = OutputTensorConfig(
        timeless=True,
        total_frames=80,
        steps_between_frames=2,
        screen=screen
    )

    halted_evolution = HaltedEvolution(
        world=world_config,
        evolution_config=novelty_config,
        output_config=output_config
    )

    return halted_evolution

In [14]:
class BehaviorIdentificationModel(torch.nn.Module):
    def __init__(self, n_classes=2):
        super(BehaviorIdentificationModel, self,).__init__()

        self.n_classes = n_classes

        self.conv1 = torch.nn.Conv2d(1, 1, 5, stride=2, padding=2)
        self.activation1 = torch.nn.ReLU()
        self.conv2 = torch.nn.Conv2d(1, 1, 3, stride=2, padding=1)
        self.activation2 = torch.nn.ReLU()
        self.conv3 = torch.nn.Conv2d(1, 1, 3, stride=2)
        self.activation3 = torch.nn.ReLU()

        self.pooling = torch.nn.MaxPool2d(2, stride=2)
        self.flatten = torch.nn.Flatten()
        self.linear1 = torch.nn.Linear(3844, 400)

        self.activation5 = torch.nn.ReLU()
        self.linear2 = torch.nn.Linear(400, 100)
        self.activation6 = torch.nn.ReLU()
        self.classification_layer = torch.nn.Linear(100, self.n_classes)

        self.softmax = torch.nn.Softmax(dim=0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.activation1(x)
        x = self.conv2(x)
        x = self.activation2(x)
        # print(x.size())
        # x = self.conv3(x)
        # x = self.activation3(x)

        x = self.pooling(x)
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.activation5(x)
        x = self.linear2(x)
        x = self.activation6(x)
        x = self.classification_layer(x)
        return x

    def increaseClassCount(self):
        self.n_classes += 1
        self.classification_layer = torch.nn.Linear(100, self.n_classes)

# Test Halted Evolution + TensorBoard
The following initializes a possible evolution, simulates the genome, and outputs the resulting frame to tensorboard

In [23]:
def testTensorBoardAndEvolutionNexting():
    # Writer will output to ./runs/ directory by default
    writer = SummaryWriter()

    evolution = initializeHaltedEvolution()
    evolution.evolve_config.lifespan = 1200
    evolution.setup()

    for i in range(60):
        frame, behavior_vector = evolution.next()
        frame = frame.astype(np.uint8)
        reshaped = np.reshape(frame, (1, 500, 500))

        # Tensorboard output
        writer.add_image('images', reshaped.astype(np.uint8), i, dataformats="CWH")

    writer.close()
    pygame.quit()

testTensorBoardAndEvolutionNexting()

Hello World!


# Test Single Pass Through of the Network

In [16]:
def testInputAndBackpropOfAlteredNetwork():
    evolution = initializeHaltedEvolution()
    evolution.setup()

    model = BehaviorIdentificationModel(n_classes=6)
    frame, _ = evolution.next()
    frame = frame.astype(np.uint8)
    reshaped = np.reshape(frame, (1, 500, 500))

    # Initialize a test loss function
    loss_fn = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    tensor = torch.tensor(reshaped, dtype=torch.float32)
    y_hat1 = model.forward(tensor)
    print(y_hat1)

    y_truth = torch.tensor([[1, 0, 0, 0, 0, 0]], dtype=torch.float32)
    loss = loss_fn(y_hat1, y_truth)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    model.increaseClassCount()
    y_hat2 = model.forward(tensor)
    y_truth = torch.tensor([[1, 0, 0, 0, 0, 0, 0]], dtype=torch.float32)
    loss = loss_fn(y_hat2, y_truth)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    print(y_hat1, y_hat2)
    pygame.quit()
#testInputAndBackpropOfAlteredNetwork()

Get Number of trainable parameters in the network

In [17]:
def printModelInformation():
    model = BehaviorIdentificationModel(n_classes=6)
    print(sum(p.numel() for p in model.parameters() if p.requires_grad))
    print(f"Model structure: {model}\n\n")
#printModelInformation()

In [20]:
def training_iteration(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        optimizer.zero_grad()

        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # print(f"Pred: {pred}", f"Truth: {y}")

        # Backpropagation
        loss.backward()
        optimizer.step()

        if batch % 20 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

    return loss

def testing_iteration(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            soft_func = torch.nn.Softmax(dim=1)
            softy = soft_func(pred)

            for i, row in enumerate(softy):
                if row.argmax() == y[i]:
                    correct += 1

            # correct += (softy.argmax(1) == y).type(torch.float).sum().item()
        print(pred, softy)

    test_loss /= num_batches
    # print(correct, size)
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

    return test_loss, correct

epochs = 40
model = BehaviorIdentificationModel(n_classes=2)
data_transform = transforms.Compose([transforms.Grayscale(num_output_channels=1),
                                     transforms.ToTensor()])
train_folder = torchvision.datasets.ImageFolder(root="./data/train", transform=data_transform)
test_folder = torchvision.datasets.ImageFolder(root="./data/test", transform=data_transform)

train_loader = torch.utils.data.DataLoader(
    train_folder,
    batch_size=10,
    shuffle=True
)

test_loader = torch.utils.data.DataLoader(
    test_folder,
    batch_size=10,
    shuffle=True
)

loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

writer = SummaryWriter()

for epoch in range(epochs):
    print(f"Starting epoch {epoch}")
    print(type(writer))
    train_loss = training_iteration(train_loader, model, loss_fn, optimizer)
    writer.add_scalar('Loss/Train', train_loss, global_step=epoch)

    if epoch % 1 == 0:
        test_loss, accuracy = testing_iteration(test_loader, model, loss_fn)
        writer.add_scalar('Loss/Test', test_loss, global_step=epoch)
        writer.add_scalar('Accuracy/Test', accuracy, epoch)


Starting epoch 0
<class 'torch.utils.tensorboard.writer.SummaryWriter'>
loss: 0.704639  [    0/  240]
loss: 0.698744  [  200/  240]
tensor([[0.0198, 0.0752],
        [0.0210, 0.0757],
        [0.0196, 0.0751],
        [0.0200, 0.0754]]) tensor([[0.4862, 0.5138],
        [0.4863, 0.5137],
        [0.4861, 0.5139],
        [0.4862, 0.5138]])
Test Error: 
 Accuracy: 51.8%, Avg loss: 0.691775 

Starting epoch 1
<class 'torch.utils.tensorboard.writer.SummaryWriter'>
loss: 0.687584  [    0/  240]
loss: 0.695818  [  200/  240]
tensor([[-0.0153,  0.1078],
        [-0.0153,  0.1076],
        [-0.0152,  0.1080],
        [-0.0159,  0.1077]]) tensor([[0.4693, 0.5307],
        [0.4693, 0.5307],
        [0.4693, 0.5307],
        [0.4691, 0.5309]])
Test Error: 
 Accuracy: 51.8%, Avg loss: 0.692810 

Starting epoch 2
<class 'torch.utils.tensorboard.writer.SummaryWriter'>
loss: 0.694654  [    0/  240]
loss: 0.693968  [  200/  240]
tensor([[0.0491, 0.0405],
        [0.0522, 0.0372],
        [0.0496, 0.0