In [1]:
# Dependencies

import torch
import networkx as nx
from sklearn.base import BaseEstimator, ClassifierMixin
from datasets import load_dataset
from sklearn.model_selection import cross_val_score, ShuffleSplit, cross_val_predict
from graph import process_dataset
from sklearn.metrics import confusion_matrix, accuracy_score
import sys


sys.path.append("../")

import thdc
from hdc import pm


torch.set_default_device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
DATASET = load_dataset("graphs-datasets/MUTAG")["train"]
# DATASET = load_dataset("graphs-datasets/AIDS")["full"]
# DATASET = load_dataset("graphs-datasets/PROTEINS")["train"]

In [None]:
FOLDS, REPS, DIMENSIONS = 10, 3, 10000
(graphs, labels) = process_dataset(DATASET)
VECTORS = torch.randint(0, 2, (31, DIMENSIONS), dtype=torch.float64).cuda()
VECTORS[VECTORS == 0] = -1

MAT = torch.from_numpy(pm(DIMENSIONS)).cuda()

In [None]:
import random


def create_map(length):
    if length > 1000:
        raise ValueError("Length should be less than or equal to 29")

    m = {}
    a = random.sample(range(1000), length)
    for i in range(length):
        m[i] = a[i]

    return m


def encode(graph, vectors, mat):
    nx.relabel_nodes(graph, create_map(len(list(graph))))

    G = None
    for vs in nx.bfs_layers(graph, [list(graph)[0]]):
        if G is None:
            G = torch.sum(
                torch.index_select(vectors, 0, torch.tensor(vs)),
                dim=0,
            )
        else:
            G = torch.sum(
                torch.cat(
                    [
                        torch.matmul(G, mat)[None, :],
                        torch.index_select(vectors, 0, torch.tensor(vs)),
                    ],
                    0,
                ),
                dim=0,
            )
    yield G

In [None]:
class GraphClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self):
        self.memory = thdc.ItemMemory()

    def fit(self, X, y):
        for i in range(len(X)):
            for x in encode(X[i], VECTORS, MAT):
                self.memory.add_vector(str(y[i]), x)
        return self

    def predict(self, X):
        p = []
        for query in X:
            for x in encode(query, VECTORS, MAT):
                vs = self.memory.cleanup_all(x, 5)
                p.append(
                    1
                    if sum(1 * v[2] ** 4 if v[0] == "1" else -1 * v[2] ** 4 for v in vs)
                    >= 0
                    else 0
                )

        return p

In [None]:
def main():
    sum = 0
    for _ in range(REPS):
        clf = GraphClassifier()
        scores = cross_val_score(
            clf,
            graphs,
            labels,
            cv=ShuffleSplit(),
            n_jobs=1,
            verbose=4,
        )
        print("Acc =>", scores.mean())
        sum += scores.mean()
    print("Avg Acc =>", sum / REPS)


def conf():
    clf = GraphClassifier()
    y_pred = cross_val_predict(clf, graphs, labels, cv=5, n_jobs=1, verbose=4)
    print(confusion_matrix(labels, y_pred))
    print(accuracy_score(labels, y_pred))


main()
# conf()