In [None]:
import torch
from torch.utils.data import DataLoader
import os
from torch import nn
from torch.utils.data import Dataset
import pandas as pd

In [None]:
class PipesDataset(Dataset):
    def __init__(self, path: str):
        self.path = path
        curr_dir = os.getcwd()
        data_path = os.path.join(curr_dir, path)
        self.df = pd.read_csv(data_path, dtype=str)  # dataframe

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx: int):
        all = self.df.iloc[idx]

        state = all.iloc[0]
        action = int(all.iloc[1])

        # Create a list, where each entry in the list is an int
        # the list as a whole represents the state of the board
        state_int_list = [int(x) for x in state]
        state_tensor = torch.tensor(state_int_list)

        action_tensor = torch.tensor(action)

        return (state_tensor, action_tensor)

In [None]:
class PipesPredictor(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size),
        )

    def forward(self, x):
        return self.layers(x)

In [None]:
class PipesLoss(nn.Module):
    def __init__(self, penalty=1.0):
        super().__init__()
        self.penalty = penalty

    def forward(self, pred, actions):
        """
        predictions: Tensor of shape (batch_size, num_classes) - raw logits
        labels: Tensor of shape (batch_size, num_classes) - multi-hot encoded labels (0 or 1)
        """
        # convert the output to probabilities
        probabilities = nn.functional.softmax(pred, dim=1)

        # normalize labels
        normalized_labels = actions / actions.sum(dim=1, keepdim=True).clamp(min=1e-7)

        # compute the sum for the labels
        sums = torch.sum(actions, dim=1)

        # cross entropy loss for each pipe
        loss = -torch.sum(normalized_labels * torch.log(probabilities + 1e-7), dim=1) / sums
        # take the average of the loss values
        return loss.mean()

In [None]:
# split the data into training and testing data
train_data = PipesDataset("data/train.csv")
test_data = PipesDataset("data/test.csv")

# prepare the dataset for training with DataLoaders
batch_size = 64
train_dataloader = DataLoader(train_data, batch_size, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size, shuffle=True)

# get the header from the training data
train_features, train_labels = next(iter(train_dataloader))

test_features, test_labels = next(iter(test_dataloader))

device = (
    torch.accelerator.current_accelerator().type
    if torch.accelerator.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [None]:
n = 4

model = PipesPredictor(n**2 * 4, 64, n**2).to(device)

learning_rate = 1e-3
epochs = 5
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
loss_fn = nn.CrossEntropyLoss()

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device).float(), y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)

        # backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device).float(), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            predicted_labels = torch.argmax(pred, dim=1)
            correct += (predicted_labels == y).sum().item()


    test_loss /= num_batches
    correct /= size
    print(
        f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n"
    )

In [None]:
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(test_dataloader, model, loss_fn)
print("Done!")

In [None]:
model_file_name = "model.pth"
curr_dir = os.getcwd()
model_file_path = os.path.join(curr_dir, model_file_name)
torch.save(model.state_dict(), model_file_path)

In [None]:
model_file_name = "model.pth"
model_file_path = os.path.join(curr_dir, model_file_name)
model.load_state_dict(torch.load(model_file_path, weights_only=True))
model.eval()

In [None]:
global cache
cache = []
global visited
visited = set()


# initial = "0010100001001001101011001010011101011000010001111001110101011100"
# goal = "0010001001000011101011000101101110100010001010111100110101011001"
initial = "0110101010110100010101101110000110101010001110111000001101000100"
goal = "0110010101110001101001101011001010101010110010111000110000011000"
# state_int_list = [int(x) for x in state]
# state_tensor = torch.tensor(state_int_list).to(device).float()
# result = model(state_tensor)
# print(torch.softmax(result, dim=0))
# print(torch.argmax(result))

In [None]:
def pick_move(state):
    global cache
    global visited
    if len(cache) > 3:
        cache = cache[-3:]
    # convert the state to integers
    state_int_list = [int(x) for x in state]
    # convert the state to a tensor
    state_tensor = torch.tensor(state_int_list).to(device).float()
    # get the predicted move from the neural network
    prob = model(state_tensor)
    # result = torch.argmax(prob).item()
    results = torch.topk(prob, 16).indices
    # print(results)
    # output the result
    # print(torch.softmax(prob, dim=0))
    # print(result)
    # cache.append(result)

    # apply the rotation to the state and return the new state
    new_state = state
    result = 0
    for r in results:
        result = int(r)
        new_state = pipe_rotate_binary(result, state)
        if new_state not in visited:
            visited.add(new_state)
            break
    if new_state == state:
        raise Exception("chyme")
    return new_state, result


def pipe_rotate_binary(pipe: int, board: str):
    """
    Takes a binary representation of a board of pipes as a string, and a pipe to rotate. Outputs a binary representation of the board after rotating the pipe.

    :params pipe: The pipe to rotate
    :params board: Binary representation of the board as a string

    """
    # each pipe has 4 values associated to it, so pipe n starts at index 4 * n
    start_index = 4 * pipe
    up = board[start_index]
    right = board[start_index + 1]
    down = board[start_index + 2]
    left = board[start_index + 3]

    # rotate clockwise
    new_board = (
        board[:start_index] + left + up + right + down + board[start_index + 4 :]
    )

    return new_board

In [None]:
state = initial
visited.add(initial)
moves = 0
while state != goal:
    state, result = pick_move(state)
    print(state, result)
    visited.add(state)
    moves += 1
print(f"moves: {moves}")