# Capstone Project Test Running Time

# Installation

In [30]:
# !pip install torch_geometric

# Libraries

In [31]:
import os
import torch
print(torch.__version__)
from torch_geometric.nn import GATConv
from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
import torch.nn.functional as F
import time
import subprocess
import platform
import psutil
import numpy as np

2.7.0+cpu


# Configuration & Mapping

In [32]:
GlobalConfiguration = {
    "GAT": {
        "hiddenChannel": 16,
        "head": 1
    },
    "model": {
        "savePath": "model_params.pth",
        "scaleMin": -127,
        "scaleMax": 127,
    },
    "dataset": {
        "root": "data/Planetoid",
        # "name": "Cora",
        "name": "CiteSeer",
        "normalization": False,
    },
}

MappingModelParam = {
    "conv1.att_src" : "a_src_1",
    "conv1.att_dst" : "a_dst_1",
    "conv1.bias" : "b_1",
    "conv1.lin.weight" : "w_1",
    "conv2.att_src" : "a_src_2",
    "conv2.att_dst" : "a_dst_2",
    "conv2.bias" : "b_2",
    "conv2.lin.weight" : "w_2"
}

# Utility

In [33]:
def tensor_to_list(tensor):
    if not isinstance(tensor, torch.Tensor):
        raise TypeError("Input must be a torch.Tensor")

    # Flatten the tensor and convert to a list
    return tensor.flatten().tolist()

def quantized(tensor, scale_min, scale_max, to_dtype=torch.int8):
    v_max = tensor.max() if tensor.max() != 0 else 1  # Avoid division by zero

    # Scale the tensor
    quantized_tensor = (tensor / v_max) * scale_max
    quantized_tensor = quantized_tensor.clamp(scale_min, scale_max)
    quantized_tensor = quantized_tensor.to(to_dtype)

    # Define a function to scale back to the original range
    def dequantized(quantized_tensor):
        quantized_tensor = quantized_tensor.to(torch.float32)  # Ensure float for computation
        return (quantized_tensor / scale_max) * v_max
    return quantized_tensor, dequantized

def currentOption():
  if torch.cuda.is_available():
    print("GPU is available.")
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

    try:
        output = subprocess.check_output(
            ["nvidia-smi", "--query-gpu=clocks.gr,clocks.sm,clocks.mem", "--format=csv,noheader,nounits"],
            encoding='utf-8'
        )
        gr, sm, mem = output.strip().split(', ')
        print(f"GPU Graphics Clock: {gr} MHz")
        print(f"GPU SM Clock: {sm} MHz")
        print(f"GPU Memory Clock: {mem} MHz")
    except Exception as e:
        print("Could not fetch GPU frequency. Make sure 'nvidia-smi' is installed.")
  else:
      print("Running on CPU.")

  # CPU Information
  print(f"Processor: {platform.processor()}")
  print(f"CPU Count: {os.cpu_count()}")

  # RAM Information
  virtual_memory = psutil.virtual_memory()
  print(f"Total RAM: {virtual_memory.total / 1e9:.2f} GB")
  print(f"Available RAM: {virtual_memory.available / 1e9:.2f} GB")

  #Frequency
  cpu_freq = psutil.cpu_freq()
  if cpu_freq:
      print(f"CPU Frequency: {cpu_freq.current:.2f} MHz (Max: {cpu_freq.max:.2f} MHz)")


  # Disk Information
  disk_usage = psutil.disk_usage('/')
  print(f"Total Disk Space: {disk_usage.total / 1e9:.2f} GB")
  print(f"Used Disk Space: {disk_usage.used / 1e9:.2f} GB")
  print(f"Free Disk Space: {disk_usage.free / 1e9:.2f} GB")

  # Operating System Information
  print(f"Operating System: {platform.system()} {platform.release()}")
  print(f"Python Version: {platform.python_version()}")

def benchmark_test(model_instance, runs=100, verbose=True):
  times = []
  last_accuracy = None

  for _ in range(runs):
      start_time = time.time()
      last_accuracy = model_instance.test()
      end_time = time.time()
      times.append(end_time - start_time)

  mean_time = np.mean(times)

  if verbose:
      print()
      print(f"✅ Benchmark complete: {runs} runs")
      print(f"⏱️ Mean test time: {round(mean_time*1000)} ms")
      print(f"🎯 Last test accuracy: {last_accuracy*100}%")

  return mean_time, last_accuracy

# Dataset Loader Class

In [34]:
class DatasetLoaderV2:
  def __init__(self,
               root: str = GlobalConfiguration["dataset"]["root"],
               name: str = GlobalConfiguration["dataset"]["name"],
               normalize: int = GlobalConfiguration["dataset"]["normalization"]):
      self.root = root
      self.name = name
      self.normalize = normalize
      self.dataset = self._load_dataset()

  def _load_dataset(self):
      transform = NormalizeFeatures() if self.normalize else None
      return Planetoid(root=self.root, name=self.name, transform=transform)

  def get_data(self, index: int = 0):
      return self.dataset[index]

  def get_dataset(self):
      return self.dataset

  def get_edges(self):
      return self.dataset[0].edge_index

  def get_isolated(self):
      edges = self.get_edges()
      edges_src = edges[0]
      edges_dst = edges[1]
      all_nodes = torch.unique(torch.cat([edges_src, edges_dst]))
      total_nodes = self.get_data().x.shape[0]
      isolated_nodes = [node for node in range(total_nodes) if node not in all_nodes]
      isolated_map = {}
      print(self.get_data().x.shape)
      for node_idx in isolated_nodes:
        isolated_map[node_idx] = self.get_data().x[node_idx]
      return isolated_nodes, isolated_map

# GAT Algorithm Class

In [35]:
class GATV2(torch.nn.Module):
    def __init__(self,
                 data_loader,
                 hidden_channels = GlobalConfiguration["GAT"]["hiddenChannel"],
                 heads = GlobalConfiguration["GAT"]["head"]):
        super().__init__()
        torch.manual_seed(1234567)
        self.conv1 = GATConv(data_loader.get_dataset().num_features, hidden_channels, heads, True)
        self.conv2 = GATConv(heads * hidden_channels, data_loader.get_dataset().num_classes, 1, False)

    def forward(self, x, edge_index):
        p_default = 0.6
        x = F.dropout(x, p=p_default, training=self.training)
        x = self.conv1(x, edge_index)
        x = F.elu(x)
        x = F.dropout(x, p=p_default, training=self.training)
        x = self.conv2(x, edge_index)
        return x

# Model Class

In [36]:
class BuildModelV2():
    def __init__(self, model, data_loader, save_path = GlobalConfiguration["model"]["savePath"]):
        self.model = model
        self.save_path = save_path
        self.data_loader = data_loader
        self.load_model_params()

    def load_model_params(self):
        """Load model parameters from the file if it exists."""
        basePath = "model"
        fullPath = os.path.join(basePath, GlobalConfiguration["dataset"]["name"], self.save_path)
        if os.path.exists(fullPath):
            self.model.load_state_dict(torch.load(fullPath))
            print(f"Model parameters loaded from {fullPath}")
            return True
        else:
            print(f"No saved model parameters found at {fullPath}")
            return False

    # Return:
    # - Dictionary of parameters
    # - Example: {'a_src_1': [...], 'a_dst_1': [...], 'a_1': [...], 'b_1': [...], 'a_src_2': [...], 'a_dst_2': [...], 'a_2': [...], 'b_2': [...]}
    def get_model_params(self):
        result = {}
        param = self.model.state_dict();
        for k, v in param.items():
          quantized_v, _ = quantized(v,
                                        GlobalConfiguration["model"]["scaleMin"],
                                        GlobalConfiguration["model"]["scaleMax"],
                                        torch.int8)
          if quantized_v.ndim == 3:
            quantized_v = quantized_v.reshape(quantized_v.shape[0], -1)
          if k == "conv1.lin.weight" or k == "conv2.lin.weight":
            quantized_v = quantized_v.t()
          result[MappingModelParam[k]] = tensor_to_list(quantized_v)

        result['a_1'] = result['a_src_1'] + result['a_dst_1']
        result['a_2'] = result['a_src_2'] + result['a_dst_2']
        return result

    def test(self, visualization_2D=False, visualization_3D=False):
        self.model.eval()
        data = self.data_loader.get_data()

        # Tính toán đầu ra của mô hình
        out = self.model(data.x, data.edge_index)

        # Sử dụng torch.argmax để lấy chỉ mục có giá trị lớn nhất trong mỗi dòng của tensor out
        pred = torch.argmax(out, dim=1)

        # Sử dụng test_mask để chỉ lấy các phần tử cần so sánh
        correct_predictions = (pred == data.y) & data.test_mask

        # Tính toán độ chính xác
        test_acc = correct_predictions.sum().item() / data.test_mask.sum().item()

        return test_acc


# Main

In [37]:
currentOption()

Running on CPU.
Processor: AMD64 Family 25 Model 68 Stepping 1, AuthenticAMD
CPU Count: 16
Total RAM: 29.73 GB
Available RAM: 11.97 GB
CPU Frequency: 3301.00 MHz (Max: 3301.00 MHz)
Total Disk Space: 701.16 GB
Used Disk Space: 480.31 GB
Free Disk Space: 220.85 GB
Operating System: Windows 10
Python Version: 3.11.4


In [38]:
data_loader_instance = DatasetLoaderV2()
gat_instance = GATV2(data_loader_instance)
model_instance = BuildModelV2(gat_instance, data_loader_instance)

benchmark_test(model_instance, runs=100)
print()

Model parameters loaded from model\CiteSeer\model_params.pth

✅ Benchmark complete: 100 runs
⏱️ Mean test time: 19 ms
🎯 Last test accuracy: 71.6%

