In [1]:
import torch
import torch.nn.functional as F
%pip install torch_geometric
from torch_geometric.nn import SAGEConv
import os
import json
import os
import pickle
import json
import random
import numpy as np
from torch_geometric.data import Data
import networkx as nx
import warnings
import math
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from torch.utils.data.dataloader import default_collate
from torch.utils.data import random_split

Note: you may need to restart the kernel to use updated packages.


In [2]:
def get_files_in_folder(input_folder):
    file_list = []
    for file_name in os.listdir(input_folder):
        file_path = os.path.join(input_folder, file_name)
        if os.path.isfile(file_path):
            file_list.append(file_path)
    return file_list

# Example usage:
folder_path = 'done_all'
verilog_files = get_files_in_folder(folder_path)
print(len(verilog_files))
print(verilog_files)

396
['done_all\\adder10_synth.txt', 'done_all\\adder11_synth.txt', 'done_all\\adder12_synth.txt', 'done_all\\adder13_synth.txt', 'done_all\\adder14_synth.txt', 'done_all\\adder15_synth.txt', 'done_all\\adder16_synth.txt', 'done_all\\adder17_synth.txt', 'done_all\\adder18_synth.txt', 'done_all\\adder19_synth.txt', 'done_all\\adder1_synth.txt', 'done_all\\adder20_synth.txt', 'done_all\\adder21_synth.txt', 'done_all\\adder22_synth.txt', 'done_all\\adder23_synth.txt', 'done_all\\adder24_synth.txt', 'done_all\\adder25_synth.txt', 'done_all\\adder26_synth.txt', 'done_all\\adder27_synth.txt', 'done_all\\adder28_synth.txt', 'done_all\\adder2_synth.txt', 'done_all\\adder3_synth.txt', 'done_all\\adder4_synth.txt', 'done_all\\adder5_synth.txt', 'done_all\\adder6_synth.txt', 'done_all\\adder7_synth.txt', 'done_all\\adder8_synth.txt', 'done_all\\adder9_synth.txt', 'done_all\\and10_gate_synth.txt', 'done_all\\and11_gate_synth.txt', 'done_all\\and12_gate_synth.txt', 'done_all\\and13_synth.txt', 'done

In [3]:
def extracting_attributes(verilog_file):
    try:
        if os.path.isfile(verilog_file):
            with open(verilog_file, "r") as file:
                loaded_data = json.load(file)
                nodes = loaded_data[0]
                edges = loaded_data[1]
                label = loaded_data[2]
                
                x = torch.tensor(nodes, dtype=torch.float)
                edge_index = torch.tensor(edges, dtype=torch.long)
                y = torch.tensor(label, dtype=torch.float)
                num_nodes = x.size(0)
                
                # Create batch assignment vector (assuming one graph per file)
                batch = torch.zeros(num_nodes, dtype=torch.long)
                data = Data(x=x, edge_index=edge_index, y = y, batch = batch)
                return data
    
    except Exception as e:
        print(e)
        return e


In [4]:
class VerilogDataset(Dataset):  # Using Dataset from torch_geometric
    def __init__(self, verilog_files):
        print(f"Loaded {len(verilog_files)} Verilog files.")
        self.verilog_files = verilog_files

    def __len__(self):
        return len(self.verilog_files)

    def __getitem__(self, idx):
        verilog_file = self.verilog_files[idx]
        data = extracting_attributes(verilog_file)
        return data

dataset = VerilogDataset(verilog_files)
print(len(dataset))

Loaded 396 Verilog files.
396


In [5]:
print(dataset[0])
print(verilog_files[0])
print(dataset.verilog_files[0])

Data(x=[54, 7], edge_index=[2, 72], y=[1, 16], batch=[54])
done_all\adder10_synth.txt
done_all\adder10_synth.txt


In [6]:
def are_all_data_objects_unique(dataset):
    data_objects = []
    for data in dataset:
        if data in data_objects:
            return False
        data_objects.append(data)
    return True

# Example usage:
is_unique = are_all_data_objects_unique(dataset)
if is_unique:
    print("All data objects are unique.")
else:
    print("Duplicate data objects found.")


All data objects are unique.


In [7]:
y_labels = []
for data in dataset:
    # print(data)
    # print(data.y.tolist())
    y_labels.append(np.argmax(data.y.tolist()))

In [8]:
def custom_collate(batch):
    if isinstance(batch[0], Data):
        return batch
    else:
        return default_collate(batch)
    


In [9]:
X_train, X_test, y_train, y_test = train_test_split(dataset, y_labels, test_size=0.2, stratify = y_labels, random_state=41)
train_loader = DataLoader(X_train, batch_size=16, shuffle=True, collate_fn=custom_collate)
test_loader = DataLoader(X_test, batch_size=16, shuffle = False, collate_fn=custom_collate)

In [10]:
# len(train_loader.dataset)
print(train_loader.dataset[0])

Data(x=[82, 7], edge_index=[2, 109], y=[1, 16], batch=[82])


In [11]:
print(torch.all(dataset[0].y == dataset[1].y).tolist() == True)

True


In [12]:
def generate_pairs(dataset):
    pairs = []
    pair_labels = []
    for i in range(len(dataset)):
        for j in range(i + 1, len(dataset)):
            pairs.append((dataset[i], dataset[j]))
            pair_labels.append(1 if torch.all(dataset[i].y == dataset[j].y).tolist() else 0)
    return pairs, pair_labels

# Example usage
pairs, pair_labels = generate_pairs(X_train)


In [13]:
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool

class SiameseGNN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(SiameseGNN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(7, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        # print(x)
        x = global_mean_pool(x, data.batch)  # [batch_size, hidden_channels]
        return x

    def compute_similarity(self, data1, data2):
        emb1 = self.forward(data1)
        emb2 = self.forward(data2)
        return F.cosine_similarity(emb1, emb2), emb1, emb2


In [14]:
data_pairs = [(data1, data2) for data1, data2 in pairs]
print(data_pairs[1234])
labels = torch.tensor(pair_labels, dtype=torch.float32)
# print(labels)

(Data(x=[17, 7], edge_index=[2, 23], y=[1, 16], batch=[17]), Data(x=[26, 7], edge_index=[2, 36], y=[1, 16], batch=[26]))


In [15]:
def contrastive_loss(similarity, label, margin=0.5):
    loss = (label * (1 - similarity)**2 + (1 - label) * F.relu(similarity - margin)**2).mean()
    return loss


In [28]:
from torch_geometric.loader import DataLoader

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
siamese_model = SiameseGNN(hidden_channels=32).to(device)
optimizer = torch.optim.Adam(siamese_model.parameters(), lr=0.01)

def train(model, data_pairs, labels):
    model.train()
    total_loss = 0
    for i, (data1, data2) in enumerate(data_pairs):
        data1, data2 = data1.to(device), data2.to(device)
        optimizer.zero_grad()
        if i % 10000 == 0:
            print(i)
        # print(data1.y)
        # print(data2.y)
        similarity, emb1, emb2 = model.compute_similarity(data1, data2)
        # print(similarity)
        loss = contrastive_loss(similarity, labels[i].to(device))
        # print(loss)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(data_pairs)

# Create DataLoader for pairs
data_pairs = [(data1, data2) for data1, data2 in pairs]
# print(data_pairs)
labels = torch.tensor(pair_labels, dtype=torch.float32)
# loader = DataLoader(data_pairs, batch_size=32, shuffle=True)

# Train the model
flag = 0
for epoch in range(100):
    loss = train(siamese_model, data_pairs, labels)
    if flag == 2:
        break
    if loss < 0.3:
        flag += 1
    
    print(f'Epoch {epoch}, Loss: {loss:.4f}')


0
10000
20000
30000
40000
Epoch 0, Loss: 0.0264
0
10000
20000
30000
40000
Epoch 1, Loss: 0.0182
0
10000
20000
30000
40000


In [42]:
graph_embeddings = []
for data in X_train:
    data = data.to(device)
    graph_embeddings.append(siamese_model.forward(data).detach().cpu().numpy())
    

In [43]:
def compute_MSE(v1, v2):
    squared_diff = (v1 - v2) ** 2

# Step 2: Calculate mean squared error
    mse = np.mean(squared_diff)

# Step 3 (optional): Compute root mean squared error (RMSE)
    rmse = np.sqrt(mse)
    return rmse

In [47]:
correct = 0
siamese_model.eval()
with torch.no_grad():
    for data in X_test:
        data = data.to(device)
        res = siamese_model.forward(data).detach().cpu().numpy()
        min_loss = math.inf
        match = []
        for i, em in enumerate(graph_embeddings):
            diff = compute_MSE(em, res)
            if diff <min_loss:
                min_loss = diff
                match = X_train[i]
        
        if torch.all(data.y == match.y):
            correct += 1

accuracy = correct/len(X_test)
print(accuracy)

0.8


In [49]:
torch.save(siamese_model.state_dict(), 'siamese_80.pth')

In [29]:
class ProjectionHead(torch.nn.Module):
    def __init__(self, input_dim, output_dim):
        super(ProjectionHead, self).__init__()
        self.fc1 = torch.nn.Linear(input_dim, 128)
        self.fc2 = torch.nn.Linear(128, output_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [30]:
class ContrastiveGNN(torch.nn.Module):
    def __init__(self, hidden_channels, output_dim, siamese_model):
        super(ContrastiveGNN, self).__init__()
        self.siamese_gnn = siamese_model
        self.projection_head = ProjectionHead(hidden_channels, output_dim)

    def forward(self, data):
        x = self.siamese_gnn(data)
        x = self.projection_head(x)
        return x
