In [42]:
!pip install torch-geometric -q
!pip install rdkit -q

# Creating a GAT Model

Importing required packages

In [43]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch_geometric.datasets import PPI
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import add_self_loops, softmax, degree

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Loading and understanding the dataset

We use the PPI Dataset

In [44]:
dataset = PPI(root="./", split="train")
testset = PPI(root="./", split="test")

print(f'Dataset: {dataset}:')
print('======================')
print("Dataset type: ", type(dataset))
print(f'Number of graphs: {len(dataset)}')
print("Number of classes: ", dataset.num_classes)
print("Dataset sample: ", dataset[0])
print("Sample  nodes: ", dataset[0].num_nodes)
print("Sample  edges: ", dataset[0].num_edges)
print("Sample  edge_index: ", dataset[0].edge_index.shape)
print("Sample  X: ", dataset[0].x.shape)
print("Sample  y: ", dataset[0].y.shape)

Dataset: PPI(20):
Dataset type:  <class 'torch_geometric.datasets.ppi.PPI'>
Number of graphs: 20
Number of classes:  121
Dataset sample:  Data(x=[1767, 50], edge_index=[2, 32318], y=[1767, 121])
Sample  nodes:  1767
Sample  edges:  32318
Sample  edge_index:  torch.Size([2, 32318])
Sample  X:  torch.Size([1767, 50])
Sample  y:  torch.Size([1767, 121])


Observing a sample of the data

In [45]:
sample = dataset[0]

print(sample.num_features)
print(sample.edge_index.shape)
print(sample.x.shape)
print(sample.y.shape)

50
torch.Size([2, 32318])
torch.Size([1767, 50])
torch.Size([1767, 121])


# The GAT Model

I've made a simple, single headed GAT model.

In [46]:
class GATLayer(MessagePassing):
  def __init__(self, in_channels, out_channels, dropout=0.2, alpha=0.2):
    super().__init__(aggr="add")

    self.input_channels = in_channels
    self.output_channels = out_channels
    self.dropout = dropout
    self.alpha = alpha

    self.leakyrelu = nn.LeakyReLU(self.alpha)

    # Xavier Initialization of Weights
    self.W = nn.Linear(in_channels, out_channels)
    self.att = nn.Linear(2 * out_channels, 1)
    nn.init.xavier_uniform_(self.W.weight)
    nn.init.xavier_uniform_(self.att.weight)

  def forward(self, x, edge_index):
    Wh = self.W(x)
    out = self.propagate(edge_index, x=Wh)
    return out

  def message(self, edge_index_i, x_i, x_j, size_i):

    # x_cat = Whu||Whv
    x_cat = torch.cat([x_i, x_j], dim=-1)

    # attention = aT.(Whu||Whv)
    attention = self.att(x_cat)
    attention = self.leakyrelu(attention)

    # alpha = softmax(leakyReLU(aT.(Whu||Whv)))
    alpha = softmax(attention, edge_index_i, num_nodes=size_i)

    # Dropout for regularization
    alpha = F.dropout(alpha, self.dropout)

    # Final message passing
    message = alpha * x_j
    return message

class GATModel(nn.Module):
  def __init__(self,in_channels, hidden_channels=64, out_channels, alpha=0.2):
      super().__init__()
      self.alpha = alpha
      self.gat1 = GATLayer(in_channels, hidden_channels, alpha=self.alpha)
      self.gat2 = GATLayer(hidden_channels, hidden_channels, alpha=self.alpha)
      self.fc = nn.Linear(hidden_channels, out_channels)

  def forward(self,x,edge_index):
      x = self.gat1(x,edge_index)
      x = F.leaky_relu(x, self.alpha)
      x = self.gat2(x,edge_index)
      x = F.leaky_relu(x, self.alpha)
      x = self.fc(x)
      return x


Helper function to get the trainable parameters in a model

In [47]:
def print_trainable_params(model):
  """
  Prints the trainable parameters of a PyTorch model.

  Args:
      model: The PyTorch model to inspect.
  """
  for name, param in model.named_parameters():
    if param.requires_grad:
      print(f"Trainable parameter: {name}, size: {param.size()}")


Test the model's forward method and viewing trainable parameters

In [48]:
model= GATModel(in_channels=dataset.num_features, hidden_channels=64, out_channels=dataset.num_classes, alpha=0.2).to(device)
print_trainable_params(model)

sample = dataset[0]

out = model(sample.x, sample.edge_index)
print("\nOutput shape: ", out.shape)


Trainable parameter: gat1.W.weight, size: torch.Size([64, 50])
Trainable parameter: gat1.W.bias, size: torch.Size([64])
Trainable parameter: gat1.att.weight, size: torch.Size([1, 128])
Trainable parameter: gat1.att.bias, size: torch.Size([1])
Trainable parameter: gat2.W.weight, size: torch.Size([64, 64])
Trainable parameter: gat2.W.bias, size: torch.Size([64])
Trainable parameter: gat2.att.weight, size: torch.Size([1, 128])
Trainable parameter: gat2.att.bias, size: torch.Size([1])
Trainable parameter: fc.weight, size: torch.Size([121, 64])
Trainable parameter: fc.bias, size: torch.Size([121])

Output shape:  torch.Size([1767, 121])


### Training loop for the GAT Model

In [49]:
epochs = 10
lr = 1e-3

optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = torch.nn.MSELoss()

model = model.to(device)
criterion = criterion.to(device)

for epoch in range(epochs):
    total_loss = 0
    for data in dataset:
        data = data.to(device)
        x, edge_index, y = data.x.float(), data.edge_index, data.y

        model.train()
        optimizer.zero_grad()
        out = model(x, edge_index)
        loss = criterion(out, y)
        loss.backward()
        total_loss += loss.item()
        optimizer.step()
    print(f'Epoch: {epoch+1}/{epochs}, Loss: {total_loss/len(dataset):.4f}')

Epoch: 1/10, Loss: 0.2833
Epoch: 2/10, Loss: 0.2115
Epoch: 3/10, Loss: 0.1957
Epoch: 4/10, Loss: 0.1921
Epoch: 5/10, Loss: 0.1908
Epoch: 6/10, Loss: 0.1896
Epoch: 7/10, Loss: 0.1882
Epoch: 8/10, Loss: 0.1868
Epoch: 9/10, Loss: 0.1856
Epoch: 10/10, Loss: 0.1845


### Testing loop for the GAT Model

In [50]:
with torch.no_grad():
    model.eval()
    total_loss = 0
    for data in testset:
        data = data.to(device)
        x, edge_index, y = data.x.float(), data.edge_index, data.y
        pred = model(x, edge_index)
        loss = criterion(pred, y)
        total_loss += loss.item()

print(f'Test Loss: {total_loss/len(testset):.4f}')

Test Loss: 0.1787


# Bonus: The GCN Model

In [51]:
class GCNConv(MessagePassing):
    def __init__(self, in_channels, out_channels):
        super().__init__(aggr='add')
        self.lin = nn.Linear(in_channels, out_channels, bias=False)
        self.bias = nn.Parameter(torch.empty(out_channels))

        self.reset_parameters()

    def reset_parameters(self):
        self.lin.reset_parameters()
        self.bias.data.zero_()

    def forward(self, x, edge_index):
        edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))
        x = self.lin(x)
        row, col = edge_index
        deg = degree(col, x.size(0), dtype=x.dtype)
        deg_inv_sqrt = deg.pow(-0.5)
        deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0
        norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]
        out = self.propagate(edge_index, x=x, norm=norm)
        out = out + self.bias

        return out

    def message(self, x_j, norm):
        return norm.view(-1, 1) * x_j


Testing the layer's forward method and viewing trainable parameters

In [52]:
x = sample.x.float()
y = sample.y
edge_index = sample.edge_index

gcn = GCNConv(in_channels=dataset.num_features, out_channels=dataset.num_classes)
print_trainable_params(gcn)

out = gcn(x, edge_index)
print("\nOutput shape: ", out.shape)

Trainable parameter: bias, size: torch.Size([121])
Trainable parameter: lin.weight, size: torch.Size([121, 50])

Output shape:  torch.Size([1767, 121])


Creating a 2 layer GCN Model

In [53]:
class GCNModel(torch.nn.Module):
    """Graph Convolutional Layer (GCN)"""
    def __init__(self, in_channels, out_channels):
        super(GCNModel, self).__init__()
        self.conv1 = GCNConv(in_channels, 128)
        self.conv2 = GCNConv(128, 64)
        self.fc = nn.Linear(64, out_channels)

    def forward(self, x, edge_index):
        """
        Paramters:
        x (Tensor):
            Node feature matrix
        edge_index (LongTensor):
            Graph edge connectivity
        """
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.fc(x)

        return F.log_softmax(x, dim=1)

Testing the model's forward method and viewing trainable parameters

In [54]:
x = sample.x.float()
y = sample.y
edge_index = sample.edge_index

model = GCNModel(in_channels=dataset.num_features, out_channels=dataset.num_classes)
print_trainable_params(model)

output = model(x, edge_index)
print("\nOutput shape: ", output.shape)

Trainable parameter: conv1.bias, size: torch.Size([128])
Trainable parameter: conv1.lin.weight, size: torch.Size([128, 50])
Trainable parameter: conv2.bias, size: torch.Size([64])
Trainable parameter: conv2.lin.weight, size: torch.Size([64, 128])
Trainable parameter: fc.weight, size: torch.Size([121, 64])
Trainable parameter: fc.bias, size: torch.Size([121])

Output shape:  torch.Size([1767, 121])


### Training loop for the GCN Model

In [55]:
epochs = 10
lr = 1e-3

optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = torch.nn.MSELoss()

model = model.to(device)
criterion = criterion.to(device)

for epoch in range(epochs):
    total_loss = 0
    for data in dataset:
        data = data.to(device)
        x, edge_index, y = data.x.float(), data.edge_index, data.y
        model.train()
        optimizer.zero_grad()
        out = model(x, edge_index)
        loss = criterion(out, y)
        loss.backward()
        total_loss += loss.item()
        optimizer.step()
    print(f'Epoch: {epoch+1}/{epochs}, Loss: {total_loss/len(dataset):.4f}')

Epoch: 1/10, Loss: 26.3399
Epoch: 2/10, Loss: 26.3216
Epoch: 3/10, Loss: 26.3180
Epoch: 4/10, Loss: 26.3166
Epoch: 5/10, Loss: 26.3158
Epoch: 6/10, Loss: 26.3153
Epoch: 7/10, Loss: 26.3149
Epoch: 8/10, Loss: 26.3147
Epoch: 9/10, Loss: 26.3144
Epoch: 10/10, Loss: 26.3142


### Testing loop for the GCN Model

In [56]:
with torch.no_grad():
    model.eval()
    total_loss = 0
    for data in testset:
        data = data.to(device)
        x, edge_index, y = data.x.float(), data.edge_index, data.y
        pred = model(x, edge_index)
        loss = criterion(pred, y)
        total_loss += loss.item()

print(f'Test Loss: {total_loss/len(testset):.4f}')

Test Loss: 26.1734


# Conclusion

We can observe that the GAT model performs much better than the GCN model. This is because GAT model uses attention mechanism to give more importance to the nodes that are more relevant, which can significantly improve the model's regression capabilities.