In [1]:
import torch
from torch.nn import Linear, ModuleList, functional as F, init
from torch_geometric.utils import softmax
from torch_geometric.nn import MessagePassing

from torch.optim import Adam
from torch_geometric.datasets import Planetoid

import matplotlib.pyplot as plt
from sklearn.manifold import TSNE


In [2]:


class GraphAttentionLayer(MessagePassing):
    def __init__(
        self, _in: int, _out: int, heads: int, conct: bool = True, dropout: float = 0.6
    ):
        super().__init__(aggr="add")
        self._in = _in
        self._out = _out
        self.heads = heads
        self.dropout = dropout
        self.conct = conct

        # Initialize weights for each head
        self.weights = ModuleList([Linear(_in, _out, bias=True) for _ in range(heads)])
        self.attention = ModuleList(
            [Linear(2 * _out, 1, bias=True) for _ in range(heads)]
        )
        def reset_parameters(self):
            init.xavier_uniform_(self.weight)
            init.xavier_uniform_(self.a)
    def forward(self, x: torch.Tensor, edge_index: torch.Tensor) -> torch.Tensor:
        emb_head = []
        # Independently calculate node embeddings for each head
        for head in range(self.heads):
            x_transf = self.weights[head](x)
            emb = self.propagate(edge_index, x=x_transf, head=head)
            emb_head.append(emb)

        if self.conct:
            out = torch.cat(emb_head, dim=-1)  # Concatenate
        else:
            out = torch.mean(torch.stack(emb_head), dim=0)  # Average
        return out  # (N, heads * out_features)

    def message(
        self, x_j: torch.Tensor, x_i: torch.Tensor, edge_index: torch.Tensor, head: int
    ) -> torch.Tensor:
        feat = torch.cat([x_j, x_i], dim=-1)  # (E, 2 * F')
        e = self.attention[head](feat)  # Attention coefficients
        e = F.leaky_relu(e, negative_slope=0.2)
        norm_e = softmax(
            e, index=edge_index[1], dim=0
        )  # Normalize by destination nodes
        # Optional dropout on attention weights
        # norm_e = F.dropout(norm_e, p=self.dropout, training=self.training)
        return norm_e * x_j  # Weighted features

    def update(self, aggr_out: torch.Tensor) -> torch.Tensor:
        return aggr_out  # No activation in the update for now


In [None]:
class GATTransductive(torch.nn.Module):
    def __init__(self, in_dim, hid_dim, out_dim):
        super().__init__()
        self.gat_1 = GraphAttentionLayer(in_dim, hid_dim, heads=8, conct=True)
        self.gat_2 = GraphAttentionLayer(hid_dim * 8, out_dim, heads=1, conct=False)

    def forward(self, x, edge_index):
        x = F.dropout(x, p=0.6, training=self.training)
        x = self.gat_1(x, edge_index)
        x = F.elu(x)
        x = F.dropout(x, p=0.6, training=self.training)
        x = self.gat_2(x, edge_index)
        x = F.softmax(x, dim=-1)
        return x



In [None]:
# dataset = Planetoid(root="/home/a373k/graph/gat-model/", name="Cora")
# data = dataset[0]  # Load the first graph object in the dataset
# model = GATinductive(
#     in_dim=data.num_node_features, hid_dim=8, out_dim=dataset.num_classes
# )
# optimizer = Adam(model.parameters(), lr=0.005, weight_decay=5e-4)


In [None]:
def train(model, data, optimizer):
    model.train()
    out = model(data.x, data.edge_index)
    loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss


def test(model, data, optimizer):
    model.eval()
    out = model(data.x, data.edge_index)
    pred = out.argmax(dim=1)
    accs = []
    for mask in [data.train_mask, data.val_mask, data.test_mask]:
        correct = pred[mask] == data.y[mask]  # Check against ground-truth labels.
        accs.append(
            int(correct.sum()) / int(mask.sum())
        )  # Derive ratio of correct predictions.
    return accs

In [None]:



# Function to perform t-SNE and save the plot
def save_tsne_plot(model, data, num_classes, dataset_name):
    model.eval()

    # Extract features from the first hidden layer of GAT
    with torch.no_grad():
        x = data.x
        edge_index = data.edge_index
        features = model.gat_1(x, edge_index)  # First GAT layer
        features = features.detach().cpu().numpy()  # Convert to numpy for t-SNE

    # Apply t-SNE for 2D visualization
    tsne = TSNE(n_components=2, random_state=42)
    features_tsne = tsne.fit_transform(features)

    # Get ground truth labels (node classes)
    labels = data.y.cpu().numpy()

    # Create scatter plot
    plt.figure(figsize=(8, 8))
    for i in range(num_classes):
        plt.scatter(
            features_tsne[labels == i, 0],
            features_tsne[labels == i, 1],
            label=f"Class {i}",
        )

    plt.legend()
    plt.title(f"t-SNE of GAT First Hidden Layer Features ({dataset_name})")
    save_path = f"tsne_{dataset_name}.png"
    plt.savefig(save_path)
    plt.close()  # Close the plot to avoid overlapping issues
    print(f"Saved t-SNE plot for {dataset_name} at {save_path}")


# List of datasets
dataset_names = ["Cora", "CiteSeer", "PubMed"]

# Loop through each dataset
for name in dataset_names:
    dataset = Planetoid(root=f"./{name}_dataset/", name=name)
    data = dataset[0]

    # Initialize the model (adjust dimensions as per your implementation)
    model = GATTransductive(
        in_dim=data.num_node_features, hid_dim=8, out_dim=dataset.num_classes
    )

    # Train the model (you can use your own training loop here)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.005, weight_decay=5e-4)
    print(f"{name} results")
    for epoch in range(1, 101):
        loss = train(model, data, optimizer)
        acc = test(model, data, optimizer)
        
        print(f"epoch:{epoch}=> trainLoss:{loss:.4f}, trainAcc:{acc[0]:.3f}, valAcc:{acc[1]:.3f}, testAcc:{acc[2]:.3f}")

    # Save the t-SNE plot for this dataset
    save_tsne_plot(model, data, num_classes=dataset.num_classes, dataset_name=name)


In [3]:
class GATInductive(torch.nn.Module):
    def __init__(self, in_dim, hid_dim, out_dim):
        super().__init__()
        # First GAT layer with 4 attention heads and 256 features per head
        self.gat_1 = GraphAttentionLayer(
            in_dim, hid_dim, heads=4, conct=True
        )  # Output: hid_dim * 4
        # Second GAT layer with 4 attention heads and 256 features per head
        self.gat_2 = GraphAttentionLayer(
            hid_dim * 4, hid_dim, heads=4, conct=True
        )  # Output: hid_dim * 4
        # Third GAT layer with 6 attention heads for multi-label classification
        self.gat_3 = GraphAttentionLayer(
            hid_dim * 4, out_dim, heads=6, conct=False
        )  # Output: out_dim
        # Skip connection layers
        self.skip_1 = torch.nn.Linear(in_dim, hid_dim * 4)
        self.skip_2 = torch.nn.Linear(hid_dim * 4, hid_dim * 4)

    def forward(self, x, edge_index):
        # Layer 1 with skip connection
        skip_x = self.skip_1(x)
        x = F.elu(self.gat_1(x, edge_index)) + skip_x

        # Layer 2 with skip connection
        skip_x = self.skip_2(x)
        x = F.elu(self.gat_2(x, edge_index)) + skip_x

        # Layer 3 (output layer with sigmoid activation for multi-label classification)
        x = self.gat_3(x, edge_index)
        return torch.sigmoid(x)


In [4]:
from torch_geometric.datasets import PPI
from torch_geometric.loader import DataLoader

# Load the PPI dataset
train_dataset = PPI(root="PPI/train_mask", split="train")
val_dataset = PPI(root="PPI/val_mask", split="val")
test_dataset = PPI(root="PPI/test_mask", split="test")

# Create data loaders for mini-batching (batch size = 2 as per the specification)
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)


Downloading https://data.dgl.ai/dataset/ppi.zip
Extracting PPI/train_mask/ppi.zip
Processing...
Done!
Downloading https://data.dgl.ai/dataset/ppi.zip
Extracting PPI/val_mask/ppi.zip
Processing...
Done!
Downloading https://data.dgl.ai/dataset/ppi.zip
Extracting PPI/test_mask/ppi.zip
Processing...
Done!


In [None]:
model = GATInductive(in_dim=50, hid_dim=64, out_dim=121)
# Define optimizer and loss function
optimizer = Adam(model.parameters(), lr=0.005)
criterion = torch.nn.BCELoss()  # Binary Cross-Entropy for multi-label classification


# Training loop
def train():
    model.train()
    total_loss = 0
    for data in train_loader:
        optimizer.zero_grad()
        out = model(data.x, data.edge_index)  # Forward pass
        loss = criterion(out, data.y)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights
        total_loss += loss.item()
    
    return total_loss / len(train_loader.dataset)


# Validation/Test loop
def evaluate(loader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for data in loader:
            out = model(data.x, data.edge_index)
            loss = criterion(out, data.y)
            total_loss += loss.item()
    return total_loss / len(loader.dataset)


# Main training process
for epoch in range(1, 101):
    train_loss = train()
    val_loss = evaluate(val_loader)
    print(
        f"Epoch {epoch:03d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}"
    )
test_loss = evaluate(test_loader)
print(f"Test Loss: {test_loss:.4f}")
