<a href="https://colab.research.google.com/github/elichen/numerical-linear-algebra/blob/master/nematode.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [24]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML
import random
from tqdm.notebook import tqdm
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data import random_split

In [2]:
grid_size = 100
np.random.seed(42)  # For reproducible random locations

In [5]:
# Define the updated HeuristicController class
class HeuristicController:
    def decide_move(self, local_gradient):
        # Find the direction with the highest gradient value
        max_gradient = -float('inf')
        move_direction = (0, 0)
        for direction, gradient_value in local_gradient.items():
            if gradient_value > max_gradient:
                max_gradient = gradient_value
                move_direction = direction

        return move_direction

# Define the updated WormEnvironment class with gradient map calculation
class WormEnvironment:
    def __init__(self, grid_size, controller):
        self.grid_size = grid_size
        self.controller = controller
        self.reset_environment()

    def reset_environment(self):
        self.worm_position = (random.randint(0, self.grid_size-1), random.randint(0, self.grid_size-1))
        self.pellet_positions = [(random.randint(0, self.grid_size-1), random.randint(0, self.grid_size-1)) for _ in range(10)]
        self.score = 0

    def calculate_local_gradient(self):
        """Calculate the chemical concentration gradient in the immediate vicinity of the worm."""
        x, y = self.worm_position
        local_gradient = {}

        # Consider only the immediate surroundings: up, down, left, and right
        for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]:  # Directions: Up, Down, Left, Right
            adjacent_x, adjacent_y = x + dx, y + dy

            # Check bounds
            if 0 <= adjacent_x < self.grid_size and 0 <= adjacent_y < self.grid_size:
                gradient_value = 0
                for pellet in self.pellet_positions:
                    distance = np.sqrt((pellet[0] - adjacent_x) ** 2 + (pellet[1] - adjacent_y) ** 2)
                    gradient_value += max(0, 1 / (1 + distance))  # Simplified gradient calculation
                local_gradient[(dx, dy)] = gradient_value

        return local_gradient

    def update(self):
        """Update the environment state based on the worm's decision to move based on local gradients."""
        local_gradient = self.calculate_local_gradient()
        move_direction = self.controller.decide_move(local_gradient)
        new_position = (min(max(self.worm_position[0] + move_direction[0], 0), self.grid_size-1),
                        min(max(self.worm_position[1] + move_direction[1], 0), self.grid_size-1))
        self.worm_position = new_position
        if self.worm_position in self.pellet_positions:
            self.pellet_positions.remove(self.worm_position)
            self.score += 1
        return self.worm_position, self.pellet_positions, self.score

    def draw(self):
        grid = np.zeros((self.grid_size, self.grid_size))
        grid[self.worm_position] = -1  # Worm's position
        for pellet in self.pellet_positions:
            grid[pellet] = 1  # Pellet's position
        return grid

In [6]:
fig, ax = plt.subplots()
environment = WormEnvironment(grid_size, HeuristicController())

def animate(i):
    ax.clear()
    position, pellets, score = environment.update()
    grid = environment.draw()
    ax.imshow(grid, cmap='viridis')
    ax.set_title(f'Step: {i} Score: {score}')
    return ax

ani = animation.FuncAnimation(fig, animate, frames=200, interval=100, blit=False, repeat=False)
plt.close(fig)
HTML(ani.to_jshtml())

In [25]:
def generate_training_data(environment, num_instances):
    inputs = []
    outputs = []

    for _ in range(num_instances):
        # Reset the environment to a random state for each instance
        environment.reset_environment()

        # Calculate the local gradient around the worm
        local_gradient = environment.calculate_local_gradient()

        # Create input feature vector from the local gradients
        input_features = [local_gradient.get((0, -1), 0),  # Gradient Up
                          local_gradient.get((0, 1), 0),   # Gradient Down
                          local_gradient.get((-1, 0), 0),  # Gradient Left
                          local_gradient.get((1, 0), 0)]   # Gradient Right
        inputs.append(input_features)

        # Determine the optimal move direction based on the highest gradient
        best_direction = max(local_gradient, key=local_gradient.get)
        output_label = [(0, -1) == best_direction,  # Up
                        (0, 1) == best_direction,   # Down
                        (-1, 0) == best_direction,  # Left
                        (1, 0) == best_direction]   # Right
        outputs.append(output_label)

    return np.array(inputs), np.array(outputs)

environment = WormEnvironment(grid_size, HeuristicController())
num_training_instances = 10000
inputs, outputs = generate_training_data(environment, num_training_instances)

In [26]:
class NeuralController(nn.Module):
    def __init__(self, input_size, output_size):
        super(NeuralController, self).__init__()
        # Define the architecture of the neural network
        self.network = nn.Sequential(
            nn.Linear(input_size, 128),  # Input layer to hidden layer
            nn.ReLU(),  # Activation function
            nn.Linear(128, output_size),  # Hidden layer to output layer
            nn.Softmax(dim=1)  # Output layer with softmax to predict movement direction
        )

    def forward(self, x):
        return self.network(x)

    def decide_move(self, local_gradient):
        gradient_values = [local_gradient.get(direction, 0) for direction in [(0, -1), (0, 1), (-1, 0), (1, 0)]]
        # Convert local gradient to a tensor
        gradient_tensor = torch.FloatTensor([gradient_values]).to(next(self.parameters()).device)

        # Predict the move direction using the neural network
        with torch.no_grad():
            prediction = self.forward(gradient_tensor)

        # Convert the network's prediction to a move direction
        move_index = torch.argmax(prediction, dim=1).item()
        direction_mapping = {0: (0, -1),  # Up
                             1: (0, 1),   # Down
                             2: (-1, 0),  # Left
                             3: (1, 0)}   # Right
        move_direction = direction_mapping[move_index]

        return move_direction

In [27]:
inputs_tensor = torch.tensor(inputs, dtype=torch.float)
outputs_tensor = torch.tensor(outputs, dtype=torch.long)
_, labels_indices = torch.max(outputs_tensor, 1)
dataset = TensorDataset(inputs_tensor, labels_indices)  # Use class indices for labels
train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)  # Shuffling is not necessary for validation

In [29]:
# Initialize the NeuralController with the appropriate input and output sizes
model = NeuralController(input_size=4, output_size=4)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    total_train_loss = 0

    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)

    # Validation phase
    model.eval()  # Set the model to evaluation mode
    total_val_loss = 0
    with torch.no_grad():  # No need to track gradients for validation
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_val_loss += loss.item()

    avg_val_loss = total_val_loss / len(val_loader)

    print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}')

Epoch 1/10, Training Loss: 1.3857, Validation Loss: 1.3830
Epoch 2/10, Training Loss: 1.3821, Validation Loss: 1.3789
Epoch 3/10, Training Loss: 1.3778, Validation Loss: 1.3739
Epoch 4/10, Training Loss: 1.3729, Validation Loss: 1.3673
Epoch 5/10, Training Loss: 1.3671, Validation Loss: 1.3606
Epoch 6/10, Training Loss: 1.3624, Validation Loss: 1.3557
Epoch 7/10, Training Loss: 1.3559, Validation Loss: 1.3497
Epoch 8/10, Training Loss: 1.3506, Validation Loss: 1.3433
Epoch 9/10, Training Loss: 1.3441, Validation Loss: 1.3402
Epoch 10/10, Training Loss: 1.3386, Validation Loss: 1.3300


In [30]:
fig, ax = plt.subplots()
model.eval()
pellet_positions = [(np.random.randint(0, grid_size), np.random.randint(0, grid_size)) for _ in range(5)]
environment = WormEnvironment(grid_size, model)

def animate(i):
    ax.clear()
    position, pellets, score = environment.update()
    grid = environment.draw()
    ax.imshow(grid, cmap='viridis')
    ax.set_title(f'Step: {i} Score: {score}')
    return ax

ani = animation.FuncAnimation(fig, animate, frames=200, interval=100, blit=False, repeat=False)
plt.close(fig)
HTML(ani.to_jshtml())