In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install -q torch_geometric

In [85]:
import tensorflow as tf
import torch
import torch.nn as nn
import torch.nn.functional as F
import networkx as nx
import numpy as np
import pickle

In [77]:
class Autoencoder(nn.Module):
    def __init__(self, input_dim, latent_dims):
        super(Autoencoder, self).__init__()
        self.encoder = Encoder(input_dim, latent_dims)
        self.decoder = Decoder(input_dim, latent_dims)

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

class Decoder(nn.Module):
    def __init__(self, input_dim, latent_dims):
        super(Decoder, self).__init__()
        self.linear1 = nn.Linear(latent_dims, 512)
        self.linear2 = nn.Linear(512, input_dim)

    def forward(self, z):
        z = F.relu(self.linear1(z))
        z = torch.sigmoid(self.linear2(z))
        return z

class Encoder(nn.Module):
    def __init__(self, input_dim, latent_dims):
        super(Encoder, self).__init__()
        self.linear1 = nn.Linear(input_dim, 512)
        self.linear2 = nn.Linear(512, latent_dims)

    def forward(self, x):
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.linear1(x))
        return self.linear2(x)

class BaseNetwork(nn.Module):
  """
  A simple neural network with one hidden layer.
  """
  def __init__(self, in_features, hidden_size, out_features):
    """
    Initializes the network architecture.

    Args:
        in_features: Number of features in the input data.
        hidden_size: Number of neurons in the hidden layer.
        out_features: Number of features in the output layer.
    """
    super(BaseNetwork, self).__init__()

    # Define layers
    self.linear1 = nn.Linear(in_features, hidden_size)
    self.linear2 = nn.Linear(hidden_size, out_features)
    self.activation = nn.Softmax(dim=0)  # ReLU activation function (replace if needed)

  def forward(self, x):
    """
    Defines the forward pass of the network.

    Args:
        x: Input tensor of shape (batch_size, in_features).

    Returns:
        out: Output tensor of shape (batch_size, out_features).
    """
    # Hidden layer
    hidden = self.linear1(x)

    # Output layer with activation
    out = self.activation(self.linear2(hidden))

    return out

In [None]:
structure_generator = tf.keras.models.load_model('/content/drive/MyDrive/Classes/Spring 24/COMP 459/Project/Models/decoder.h5')
feature_generator = torch.load('/content/drive/MyDrive/Classes/Spring 24/COMP 459/Project/Models/autoencoder.pth', map_location=torch.device('cpu'))
node_labeler = torch.load('/content/drive/MyDrive/Classes/Spring 24/COMP 459/Project/Models/node_label_embeder.pth', map_location=torch.device('cpu'))

In [None]:
LATENT_DIM = 400
BOND_DIM = 5

def generate_structure (structure_generator):

  # Generate structure
  z = tf.random.normal((1, LATENT_DIM))
  reconstruction_adjacency, reconstruction_features = structure_generator.predict(z)

  # Process adjacency matrix
  adjacency = tf.argmax(reconstruction_adjacency, axis=1)[0]
  adjacency = adjacency < BOND_DIM - 1

  # Filter out unconnected nodes
  adjacency = np.array(adjacency)
  adjacency = adjacency[np.any(adjacency != 0, axis=1)][:, np.any(adjacency != 0, axis=0)]

  # Get NetworkX graph
  return nx.Graph(adjacency)

graph = generate_structure(structure_generator)
nx.draw(graph)

In [None]:
max_len = 10

def revert_onehot(tensor):
    splits = torch.chunk(tensor.squeeze(), max_len)
    row_lst = []
    for split in splits:
        subsplits = torch.split(split, [4, 28, 28])
        # print(subsplits)
        e = torch.argmax(subsplits[0])
        v1 = torch.argmax(subsplits[1])
        v2 = torch.argmax(subsplits[2])
        row_lst.append(torch.tensor((e, v1, v2)))
    return torch.stack(row_lst)

def generate_feature_set(feature_generator):
    # Sample a random point in the latent space
    latent_dims = 2
    z = torch.randn(1, latent_dims)

    # Decode the sampled latent vector to get a new vector
    return revert_onehot(feature_generator.decoder(z))

generate_feature_set(feature_generator)

In [82]:
import random

def re_name_walk(walk):
    mapping = {}
    next_num = 0
    new_walk = []
    for node in walk:
        if node in mapping.keys():
            new_walk.append(mapping[node])
        mapping[node] = next_num
        new_walk.append(mapping[node])
        next_num = next_num + 1
    return new_walk

def random_walk(G, start_node, walk_length):
    walk = [start_node]
    current_node = start_node
    for _ in range(walk_length):
        # Get neighbors of the current node
        neighbors = list(G.neighbors(current_node))
        if not neighbors:  # Handle case with isolated nodes
            break
        # Randomly choose a neighbor
        next_node = random.choice(neighbors)
        walk.append(next_node)
        current_node = next_node
    return re_name_walk(walk)

def get_node_label_fr_walk(model, walk, feature_tnsr):
  a_tnsr = torch.tensor(walk).float()
  b_tnsr = torch.flatten(feature_tnsr)
  c_tnsr = torch.cat((a_tnsr, b_tnsr))
  return model(c_tnsr)

In [113]:
walk_length = 3

def label_nodes (graph, node_labeler, feature_generator):

  node_features  = {}

  for node in range(len(graph.nodes)):
      walk = random_walk(graph, node, walk_length)
      feature_set = generate_feature_set(feature_generator)
      preds = get_node_label_fr_walk(node_labeler, walk[:walk_length], feature_set)
      pred = torch.argmax(preds)

      node_features[node] = pred.item()

  nx.set_node_attributes(graph, node_features, name="x")

label_nodes(graph, node_labeler, feature_generator)

In [None]:
# from torch_geometric.datasets import ZINC
# dataset = ZINC(root = 'data', split='train')

# edge_features = np.empty((2, 0))
# edge_labels = np.empty((1,0))

# ngraphs = 10000

# for idx in range(ngraphs):

#     graph = dataset[idx]

#     features = np.empty((2, len(graph.edge_attr)))

#     for edge_idx, edge  in enumerate(graph.edge_index.transpose(0, 1)):
#         features[:, edge_idx] = [graph.x[edge[0]].item(), graph.x[edge[1]].item()]

#     edge_features = np.append(edge_features, features, axis = 1)
#     edge_labels = np.append(edge_labels, graph.edge_attr)

# from sklearn.tree import DecisionTreeClassifier
# from sklearn.model_selection import train_test_split

# classifier = DecisionTreeClassifier()
# x_train, x_test, y_train, y_test = train_test_split(edge_features.T, edge_labels)

# classifier.fit(x_train, y_train)
# print("Accuracy:", classifier.score(x_test, y_test))

# edge_labeler = classifier

with open('/content/drive/MyDrive/Classes/Spring 24/COMP 459/Project/Models/edge_labeler.pkl', 'rb') as f:
    edge_labeler = pickle.load(f)

In [124]:
def label_edges (graph, edge_labeler):

  for edge  in graph.edges:
    edge_label = edge_labeler.predict([[graph.nodes[edge[0]]['x'], graph.nodes[edge[1]]['x']]]).item()
    graph[edge[0]][edge[1]]['x'] = edge_label

label_edges(graph, edge_labeler)

In [129]:
def generate (structure_generator, feature_generator, node_labeler, edge_labeler):
  graph = generate_structure(structure_generator)
  label_nodes(graph, node_labeler, feature_generator)
  label_edges(graph, edge_labeler)
  return graph

graph = generate(structure_generator, feature_generator, node_labeler, edge_labeler)