In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import networkx as nx
import numpy as np
from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import matplotlib.pyplot as plt

from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier

In [2]:
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor

training_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)

test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor()
)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to data/MNIST/raw/train-images-idx3-ubyte.gz


100%|███████████████████████████| 9912422/9912422 [00:00<00:00, 41009250.47it/s]


Extracting data/MNIST/raw/train-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|███████████████████████████████| 28881/28881 [00:00<00:00, 20853106.18it/s]


Extracting data/MNIST/raw/train-labels-idx1-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|███████████████████████████| 1648877/1648877 [00:00<00:00, 28613890.10it/s]

Extracting data/MNIST/raw/t10k-images-idx3-ubyte.gz to data/MNIST/raw






Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|█████████████████████████████████| 4542/4542 [00:00<00:00, 11407502.26it/s]


Extracting data/MNIST/raw/t10k-labels-idx1-ubyte.gz to data/MNIST/raw



In [3]:
device = "mps"

x_train, y_train = (training_data.data.float()/255).reshape(-1, 28*28), training_data.targets.long()
x_test, y_test = (test_data.data.float()/255).reshape(-1, 28*28).to(device), test_data.targets.long().to(device)

In [4]:
x_train, y_train = shuffle(x_train, y_train)
limits = 3200

In [5]:
from AIN import Adaptive

In [6]:
model = nn.Sequential(
    Adaptive(28 * 28, 50, loops=1, directional="random", selfLoops=True),
    nn.ReLU(),
    nn.Linear(50, 10)
)

In [7]:
# model = StandardNN(28*28, 100, 10).to(device) 
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
loss_fn = nn.CrossEntropyLoss()

epochs = 10
batch_size = 64
logs = {
    "training loss": [],
    "testing loss": [],
    "testing accuracy": [],
    "testing precision": [],
    "graph edges": []
}

x_train = x_train[:limits]
y_train = y_train[:limits]

for epoch in range(epochs):
    model.train()
    running_loss = 0
    x_train, y_train = shuffle(x_train, y_train)
    for i in range(0, x_train.shape[0]//batch_size):
        batch_num = i * batch_size
        x, y = x_train[batch_num:batch_num+batch_size], y_train[batch_num:batch_num+batch_size]
        x = x.to(device)
        y = y.to(device)
        optimizer.zero_grad(set_to_none=True)
        pred = model(x)
        loss = loss_fn(pred, y)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        print(f"EPOCH: {epoch+1}/{epochs} | BATCH: {i+1}/{x_train.shape[0]//batch_size} | LOSS: {running_loss / (i+1):.5f}", end="\r", flush=True)
    logs["training loss"].append(running_loss/(x_train.shape[0]//batch_size))
    model.eval()
    test_pred = model(x_test)
    loss_te = loss_fn(test_pred, y_test)
    test_pred = test_pred.detach().cpu().numpy().argmax(1)
    accy = accuracy_score(test_pred, y_test.detach().cpu().numpy())
    prec = precision_score(test_pred, y_test.detach().cpu().numpy(), average="weighted")
    print(f"EPOCH: {epoch+1}/{epochs} | TEST LOSS: {loss_te.item():.5f} | TEST ACCY: {accy:.5f} | TEST PREC: {prec:.5f}")#" | CONNECTIONS: {model.get_connection_count()}")
    logs["testing loss"].append(loss_te.item())
    logs["testing accuracy"].append(accy)
    logs["testing precision"].append(prec)
    #logs["graph edges"].append(model.get_connection_count())
    # model.gen_nx(f"vis_graph/{epoch+1}.png")
    running_loss = 0

EPOCH: 1/10 | TEST LOSS: 1.12186 | TEST ACCY: 0.67360 | TEST PREC: 0.70183
EPOCH: 2/10 | TEST LOSS: 0.72535 | TEST ACCY: 0.80930 | TEST PREC: 0.81742
EPOCH: 3/10 | TEST LOSS: 0.67104 | TEST ACCY: 0.83940 | TEST PREC: 0.84753
EPOCH: 4/10 | TEST LOSS: 0.64712 | TEST ACCY: 0.85140 | TEST PREC: 0.85501
EPOCH: 5/10 | TEST LOSS: 0.60982 | TEST ACCY: 0.86860 | TEST PREC: 0.87202
EPOCH: 6/10 | TEST LOSS: 0.61522 | TEST ACCY: 0.87770 | TEST PREC: 0.87958
EPOCH: 7/10 | TEST LOSS: 0.65126 | TEST ACCY: 0.87550 | TEST PREC: 0.87773
EPOCH: 8/10 | TEST LOSS: 0.69307 | TEST ACCY: 0.87060 | TEST PREC: 0.87437
EPOCH: 9/10 | TEST LOSS: 0.70646 | TEST ACCY: 0.88510 | TEST PREC: 0.88873
EPOCH: 10/10 | TEST LOSS: 0.72217 | TEST ACCY: 0.87940 | TEST PREC: 0.88243


In [13]:
from Utils import adaptiveThresholding

In [91]:
def exportGraph(model) -> nx.Graph:
    """
    Exports a graph from a given model, where nodes represent the model's neurons and edges represent the connections between them.

    Args:
        model (AIN_Base): An instance of the AIN_Base class.

    Returns:
        nx.Graph: A NetworkX graph object representing the model's graph.
    """
    
    # Convert the node weights and adjacency matrix to numpy arrays on the CPU
    nodeWeights = model.nodeWeights.detach().cpu().numpy()
    adjacencyMat = adaptiveThresholding(model.adjacencyMat).detach().cpu().numpy()

    # Create a networkx graph object based on the specified graph directionality
    if model.directional == "bi":
        # For a bidirectional graph, the adjacency matrix is made symmetric
        # and an undirected graph is created
        adjacencyMat = np.triu(adjacencyMat) + np.triu(adjacencyMat).T
        graph = nx.Graph()
    elif model.directional == "random":
        # For a randomly directed graph, a directed graph is created
        graph = nx.DiGraph()
    elif model.directional == "uni":
        # For a unidirectional graph, only the upper triangle of the
        # adjacency matrix is used, and a directed graph is created
        adjacencyMat = np.triu(adjacencyMat)
        graph = nx.DiGraph()

    # Get the indices and weights of the non-zero elements of the adjacency matrix
    edges = np.where(adjacencyMat > 0)
    edgeWeights = adjacencyMat[edges[0], edges[1]]

    # Normalize the edge weights and combine the indices and weights into a single array
    edgeWeightsNorm = edgeWeights / edgeWeights.max()
    weightedEdge = tuple(np.concatenate([
        edges[0].reshape(-1, 1), 
        edges[1].reshape(-1, 1),
        edgeWeightsNorm.reshape(-1, 1)
    ], 1))

    # Add the weighted edges to the graph and update node weights
    graph.add_weighted_edges_from(weightedEdge, weight="edge weight")
    for i, w in enumerate(nodeWeights):
        graph.nodes[i].update({"node weight": w})

    # Return the final graph
    return graph

In [93]:
def countConnections(model) -> int:
    nodeWeights = model.nodeWeights.detach().cpu().numpy()
    adjacencyMat = adaptiveThresholding(model.adjacencyMat).detach().cpu().numpy()

    if model.directional == "bi":
        adjacencyMat = np.triu(adjacencyMat) + np.triu(adjacencyMat).T
    elif model.directional == "uni":
        adjacencyMat = np.triu(adjacencyMat)

    return np.where(adjacencyMat > 0)[0].shape[0]

In [86]:
g = None
for m in model.children():
    if type(m) == Adaptive:
        g = m

In [94]:
countConnections(g)

296852

In [109]:
import numpy as np

class ImagePositionalEncoding(nn.Module):
    def __init__(self, height: int, width: int, channels: int):
        super().__init__()
        self.height = height
        self.width = width
        self.channels = channels
        self.pos_encoding = self.generate_positional_encoding()

    def generate_positional_encoding(self):
        def angle(h, w, c):
            return (h + w) / np.power(10000, (2 * (c // 2)) / self.channels)

        pos_encoding = np.fromfunction(angle, (self.height, self.width, self.channels), dtype=np.float32)
        pos_encoding[0::2] = np.sin(pos_encoding[0::2])
        pos_encoding[1::2] = np.cos(pos_encoding[1::2])

        return torch.tensor(pos_encoding, dtype=torch.float32).unsqueeze(0)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return x + self.pos_encoding.to(x.device)


class OneDimensionalPositionalEncoding(nn.Module):
    def __init__(self, sequence_length: int, d_model: int):
        super().__init__()
        self.sequence_length = sequence_length
        self.d_model = d_model
        self.pos_encoding = self.generate_positional_encoding()

    def generate_positional_encoding(self):
        def angle(pos, i):
            return pos / np.power(10000, (2 * (i // 2)) / self.d_model)

        pos_encoding = np.fromfunction(angle, (self.sequence_length, self.d_model), dtype=np.float32)
        pos_encoding[:, 0::2] = np.sin(pos_encoding[:, 0::2])
        pos_encoding[:, 1::2] = np.cos(pos_encoding[:, 1::2])

        return torch.tensor(pos_encoding, dtype=torch.float32).unsqueeze(0)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return x + self.pos_encoding.to(x.device)


In [124]:
class PositionalEncoding2D(nn.Module):
    def __init__(self, height: int, width: int, channels: int) -> None:
        """
        Initializes a 2D positional encoding layer.

        Args:
        - height (int): The height of the input image.
        - width (int): The width of the input image.
        - channels (int): The number of channels in the input image.

        Returns:
        - None
        """
        super().__init__()
        self.height = height
        self.width = width
        self.channels = channels
        self.pos_encoding = self.generate_positional_encoding()

    def generate_positional_encoding(self):
        """
        Generates the positional encoding matrix.

        Args:
        - None

        Returns:
        - pos_encoding (Tensor): The positional encoding matrix with shape (1, height, width, channels).
        """
        # Function to compute the angle for positional encoding
        def angle(h, w, c):
            return (h + w) / np.power(10000, (2 * (c // 2)) / self.channels)

        # Create the positional encoding matrix using the angle function
        pos_encoding = np.fromfunction(angle, (self.height, self.width, self.channels), dtype=np.float32)
        pos_encoding[0::2] = np.sin(pos_encoding[0::2])
        pos_encoding[1::2] = np.cos(pos_encoding[1::2])

        # Convert the positional encoding matrix to a PyTorch tensor and add a batch dimension
        return torch.tensor(pos_encoding, dtype=torch.float32).unsqueeze(0)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Computes the output of the 2D positional encoding layer.

        Args:
        - x (Tensor): The input tensor with shape (batch_size, channels, height, width).

        Returns:
        - Tensor: The output tensor with shape (batch_size, channels, height, width).
        """
        # Add the positional encoding to the input tensor and return the result
        x = x.permute(0, 2, 3, 1)  # Change the input tensor shape to (batch, height, width, channels)
        x = x + self.pos_encoding.to(x.device)
        x = x.permute(0, 3, 1, 2)  # Change the output tensor shape back to (batch, channels, height, width)
        return x

In [125]:
enc = PositionalEncoding2D(28, 28, 1)

In [127]:
enc(x_train[:32].reshape(-1, 28, 28).unsqueeze(1)).shape

torch.Size([32, 1, 28, 28])

In [117]:
oned = OneDimensionalPositionalEncoding(28*28, 1)

In [119]:
oned(x_train[:32].unsqueeze(-1)).shape

torch.Size([32, 784, 1])