In [1]:
import torch
from torch import Tensor
from torch.nn import Linear, ReLU, Sequential
import torch.nn.functional as F
from torch_geometric.data import HeteroData
from torch_geometric.loader import LinkNeighborLoader
from torch_geometric.nn import HeteroConv, GATConv, SAGEConv, Linear
from torch_geometric.nn.aggr import Aggregation, MultiAggregation
from torch_geometric.nn.conv import MessagePassing
from torch_geometric.typing import OptPairTensor, Adj, Size
import os.path as path
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
import tqdm
from typing import Optional, Union, List, Tuple

In [2]:
data_folder = "ds/"
train_hd = "train_hd_2020_3_0.1.pt"

In [3]:
data = torch.load(path.join(data_folder, train_hd))

data.validate()

  data = torch.load(path.join(data_folder, train_hd))


True

In [4]:
artist_channels = data["artist"].x.size(1)
track_channels = data["track"].x.size(1)
tag_channels = data["tag"].x.size(1)

print(f"Artist channels: {artist_channels}")
print(f"Track channels: {track_channels}")
print(f"Tag channels: {tag_channels}")

Artist channels: 16
Track channels: 5
Tag channels: 24


In [5]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: '{device}'")

Device: 'cuda'


In [6]:
compt_tree_size = [25, 20]

print("Creating train_loader...")
train_loader = LinkNeighborLoader(
    data=data,
    num_neighbors=compt_tree_size,
    neg_sampling_ratio=1,
    edge_label_index=("artist", "collab_with", "artist"),
    batch_size=128,
    shuffle=True,
    num_workers=10,
    pin_memory=True,
)

Creating train_loader...




In [7]:
class MLPSAGEConv(MessagePassing):
    r"""The GraphSAGE operator with an MLP instead of the linear transformation.

    Args:
        in_channels (int or tuple): Size of each input sample, or :obj:`-1` to
            derive the size from the first input(s) to the forward method.
            A tuple corresponds to the sizes of source and target
            dimensionalities.
        out_channels (int): Size of each output sample.
        hidden_channels (int, optional): Size of the hidden layer in the MLP.
            (default: :obj:`64`)
        aggr (str or Aggregation, optional): The aggregation scheme to use.
            Any aggregation of :obj:`torch_geometric.nn.aggr` can be used,
            *e.g.*, :obj:`"mean"`, :obj:`"max"`, or :obj:`"lstm"`.
            (default: :obj:`"mean"`)
        normalize (bool, optional): If set to :obj:`True`, output features
            will be :math:`\ell_2`-normalized, *i.e.*,
            :math:`\frac{\mathbf{x}^{\prime}_i}
            {\| \mathbf{x}^{\prime}_i \|_2}`.
            (default: :obj:`False`)
        root_weight (bool, optional): If set to :obj:`False`, the layer will
            not add transformed root node features to the output.
            (default: :obj:`True`)
        bias (bool, optional): If set to :obj:`False`, the layer will not learn
            an additive bias. (default: :obj:`True`)
        **kwargs (optional): Additional arguments of
            :class:`torch_geometric.nn.conv.MessagePassing`.

    Shapes:
        - **inputs:**
          node features :math:`(|\mathcal{V}|, F_{in})` or
          :math:`((|\mathcal{V_s}|, F_{s}), (|\mathcal{V_t}|, F_{t}))`
          if bipartite,
          edge indices :math:`(2, |\mathcal{E}|)`
        - **outputs:** node features :math:`(|\mathcal{V}|, F_{out})` or
          :math:`(|\mathcal{V_t}|, F_{out})` if bipartite
    """

    def __init__(
        self,
        in_channels: Union[int, Tuple[int, int]],
        out_channels: int,
        hidden_channels: int = 64,
        aggr: Optional[Union[str, List[str], Aggregation]] = "mean",
        normalize: bool = False,
        root_weight: bool = True,
        bias: bool = True,
        **kwargs,
    ):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.hidden_channels = hidden_channels
        self.normalize = normalize
        self.root_weight = root_weight

        if isinstance(in_channels, int):
            in_channels = (in_channels, in_channels)

        if aggr == "lstm":
            kwargs.setdefault("aggr_kwargs", {})
            kwargs["aggr_kwargs"].setdefault("in_channels", in_channels[0])
            kwargs["aggr_kwargs"].setdefault("out_channels", in_channels[0])

        super().__init__(aggr, **kwargs)


        self.mlp = Sequential(
            Linear(in_channels[0], hidden_channels),
            ReLU(),
            Linear(hidden_channels, in_channels[0]) # Output size should match input for aggregation
        )

        if isinstance(self.aggr_module, MultiAggregation):
            aggr_out_channels = self.aggr_module.get_out_channels(in_channels[0])
        else:
            aggr_out_channels = in_channels[0]

        self.lin_l = Linear(aggr_out_channels, out_channels, bias=bias)
        if self.root_weight:
            self.lin_r = Linear(in_channels[1], out_channels, bias=False)

        self.reset_parameters()

    def reset_parameters(self):
        super().reset_parameters()
        for layer in self.mlp:
            if hasattr(layer, 'reset_parameters'):
                layer.reset_parameters()
        self.lin_l.reset_parameters()
        if self.root_weight:
            self.lin_r.reset_parameters()

    def forward(
        self,
        x: Union[Tensor, OptPairTensor],
        edge_index: Adj,
        size: Size = None,
    ) -> Tensor:

        if isinstance(x, Tensor):
            x = (x, x)

        # Propagate through MLP
        x = (self.mlp(x[0]), x[1])

        # propagate_type: (x: OptPairTensor)
        out = self.propagate(edge_index, x=x, size=size)
        out = self.lin_l(out)

        x_r = x[1]
        if self.root_weight and x_r is not None:
            out = out + self.lin_r(x_r)

        if self.normalize:
            out = F.normalize(out, p=2.0, dim=-1)

        return out

    def message(self, x_j: Tensor) -> Tensor:
        return x_j

    def message_and_aggregate(self, adj_t: Adj, x: OptPairTensor) -> Tensor:
        if isinstance(adj_t, SparseTensor):
            adj_t = adj_t.set_value(None, layout=None)
        return spmm(adj_t, x[0], reduce=self.aggr)

    def __repr__(self) -> str:
        return (
            f"{self.__class__.__name__}({self.in_channels}, "
            f"{self.out_channels}, hidden_channels={self.hidden_channels}, aggr={self.aggr})"
        )



In [8]:
class GNN(torch.nn.Module):
    def __init__(self, metadata, hidden_channels, out_channels):
        super().__init__()
        self.metadata = metadata
        self.out_channels = out_channels

        self.conv1 = HeteroConv({
            ("artist", "collab_with", "artist"): GATConv((artist_channels, artist_channels), hidden_channels),
            ("artist", "has_tag_artists", "tag"): MLPSAGEConv((artist_channels, tag_channels), hidden_channels, hidden_channels * 2),
            ("artist", "last_fm_match", "artist"): GATConv((artist_channels, artist_channels), hidden_channels),
            ("track", "has_tag_tracks", "tag"): MLPSAGEConv((track_channels, tag_channels), hidden_channels, hidden_channels * 2),
            ("artist", "linked_to", "artist"): GATConv((artist_channels, artist_channels), hidden_channels),
            ("artist", "musically_related_to", "artist"): GATConv((artist_channels, artist_channels), hidden_channels),
            ("artist", "personally_related_to", "artist"): GATConv((artist_channels, artist_channels), hidden_channels),
            ("tag", "tags_artists", "artist"): MLPSAGEConv((tag_channels, artist_channels), hidden_channels, hidden_channels * 2),
            ("tag", "tags_tracks", "track"): MLPSAGEConv((tag_channels, track_channels), hidden_channels, hidden_channels * 2),
            ("track", "worked_by", "artist"): MLPSAGEConv((track_channels, artist_channels), hidden_channels, hidden_channels * 2),
            ("artist", "worked_in", "track"): MLPSAGEConv((artist_channels, track_channels), hidden_channels, hidden_channels * 2),
        }, aggr="mean")

        self.conv2 = HeteroConv({
            ("artist", "collab_with", "artist"): GATConv((hidden_channels, hidden_channels), hidden_channels),
            ("artist", "has_tag_artists", "tag"): MLPSAGEConv((hidden_channels, hidden_channels), hidden_channels, hidden_channels * 2),
            ("artist", "last_fm_match", "artist"): GATConv((hidden_channels, hidden_channels), hidden_channels),
            ("track", "has_tag_tracks", "tag"): MLPSAGEConv((hidden_channels, hidden_channels), hidden_channels, hidden_channels * 2),
            ("artist", "linked_to", "artist"): GATConv((hidden_channels, hidden_channels), hidden_channels),
            ("artist", "musically_related_to", "artist"): GATConv((hidden_channels, hidden_channels), hidden_channels),
            ("artist", "personally_related_to", "artist"): GATConv((hidden_channels, hidden_channels), hidden_channels),
            ("tag", "tags_artists", "artist"): MLPSAGEConv((hidden_channels, hidden_channels), hidden_channels, hidden_channels * 2),
            ("tag", "tags_tracks", "track"): MLPSAGEConv((hidden_channels, hidden_channels), hidden_channels, hidden_channels * 2),
            ("track", "worked_by", "artist"): MLPSAGEConv((hidden_channels, hidden_channels), hidden_channels, hidden_channels * 2),
            ("artist", "worked_in", "track"): MLPSAGEConv((hidden_channels, hidden_channels), hidden_channels, hidden_channels * 2),
        }, aggr="mean")

        self.linear1 = Linear(hidden_channels * 2, hidden_channels * 4)
        self.linear2 = Linear(hidden_channels * 4, out_channels)

    def forward(self, x_dict, edge_index_dict):
        x_dict1 = self.conv1(x_dict, edge_index_dict)
        x_dict2 = self.conv2(x_dict1, edge_index_dict)

        x_artist = torch.cat([x_dict1['artist'], x_dict2['artist']], dim=-1)

        x_artist = self.linear1(x_artist)
        x_artist = self.linear2(x_artist)

        # Normalize the artist node features
        x_artist = F.normalize(x_artist, p=2, dim=-1)

        # Update the dictionary with the new 'artist' features, leaving other nodes unchanged
        x_dict['artist'] = x_artist

        return x_dict

In [9]:


def train(model, train_loader, optimizer, criterion, device, num_epochs, val_epochs):
    for epoch in range(num_epochs):
        model.train()  # Set model to training mode
        epoch_loss = 0.0
        
        for sampled_data in tqdm.tqdm(train_loader):
            # Move data to device
            sampled_data = sampled_data.to(device)
            
            # Forward pass
            pred_dict = model(sampled_data.x_dict, sampled_data.edge_index_dict)
            
            # Get predictions and labels for the 'collab_with' edge type
            edge_label_index = sampled_data['artist', 'collab_with', 'artist'].edge_label_index
            edge_label = sampled_data['artist', 'collab_with', 'artist'].edge_label

            src_emb = pred_dict['artist'][edge_label_index[0]]  # Source node embeddings
            dst_emb = pred_dict['artist'][edge_label_index[1]]  # Destination node embeddings
            
            # Compute the dot product between source and destination embeddings
            preds = (src_emb * dst_emb).sum(dim=-1)  # Scalar for each edge
            
            # Compute loss
            loss = criterion(preds, edge_label.float())
            epoch_loss += loss.item()
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        # Average loss for the epoch
        epoch_loss /= len(train_loader)
        print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {epoch_loss:.4f}")

        # Validation?
        if (epoch + 1) % val_epochs != 0:
            continue

        print("Computing validation metrics")
        
        # Validation metrics
        model.eval()  # Set model to evaluation mode
        all_labels = []
        all_probs = []
        
        with torch.no_grad():  # Disable gradient computation for validation
            for sampled_data in tqdm.tqdm(train_loader):
                # Move data to device
                sampled_data = sampled_data.to(device)
                
                # Forward pass
                pred_dict = model(sampled_data.x_dict, sampled_data.edge_index_dict)
                
                # Get predictions and labels for the 'collab_with' edge type
                edge_label_index = sampled_data['artist', 'collab_with', 'artist'].edge_label_index
                edge_label = sampled_data['artist', 'collab_with', 'artist'].edge_label

                src_emb = pred_dict['artist'][edge_label_index[0]]  # Source node embeddings
                dst_emb = pred_dict['artist'][edge_label_index[1]]  # Destination node embeddings
                
                # Compute the dot product between source and destination embeddings
                preds = (src_emb * dst_emb).sum(dim=-1)  # Scalar for each edge

                probs = torch.sigmoid(preds)  # Convert to probabilities
                
                # Collect predictions, probabilities, and labels
                all_labels.append(edge_label.cpu())
                all_probs.append(probs.cpu())
        
        # Concatenate all predictions and labels
        all_labels = torch.cat(all_labels)
        all_probs = torch.cat(all_probs)

        # Find threshold for predictions
        print("Looking for threshold")
        best_threshold = 0
        best_f1 = 0
        for threshold in tqdm.tqdm(np.arange(0.2, 0.81, 0.01)):
            preds_binary = (all_probs > threshold).long()
            cm = confusion_matrix(all_labels, preds_binary)
            tp = cm[1, 1]
            fp = cm[0, 1]
            fn = cm[1, 0]
            tn = cm[0, 0]
            precision = 0 if tp == 0 else tp / (tp + fp)
            recall = 0 if tp == 0 else tp / (tp + fn)
            f1 = 0 if precision * recall == 0 else 2 * precision * recall / (precision + recall)
            if f1 > best_f1:
                best_threshold = threshold
                best_f1 = f1
        print(f"Best threshold: {best_threshold}")
        all_preds = (all_probs > best_threshold).long()
        
        # Compute metrics
        cm = confusion_matrix(all_labels, all_preds)
        tp = cm[1, 1]
        fp = cm[0, 1]
        fn = cm[1, 0]
        tn = cm[0, 0]
        accuracy = (tp + tn) / (tp + fp + fn + tn)
        precision = tp / (tp + fp)
        recall = tp / (tp + fn)
        f1 = 2 * precision * recall / (precision + recall)
        roc_auc = roc_auc_score(all_labels, all_probs)
        
        # Print validation metrics
        print(f"Validation Metrics - Epoch {epoch+1}/{num_epochs}:")
        print(f"Accuracy:  {accuracy:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall:    {recall:.4f}")
        print(f"F1-score:  {f1:.4f}")
        print(f"ROC-AUC:   {roc_auc:.4f}")
        print(f"Confusion Matrix:\n{tp} {fn}\n{fp} {tn}")

    return best_threshold


In [10]:
model = GNN(metadata=data.metadata(), hidden_channels=64, out_channels=64).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

best_threshold = train(
    model,
    train_loader,
    optimizer,
    F.binary_cross_entropy_with_logits,
    device,
    3,
    3
)


100%|██████████| 12516/12516 [32:31<00:00,  6.41it/s]


Epoch 1/3, Training Loss: 0.5234


100%|██████████| 12516/12516 [31:45<00:00,  6.57it/s]


Epoch 2/3, Training Loss: 0.5197


100%|██████████| 12516/12516 [33:34<00:00,  6.21it/s]


Epoch 3/3, Training Loss: 0.5194
Computing validation metrics


100%|██████████| 12516/12516 [23:43<00:00,  8.79it/s]


Looking for threshold


100%|██████████| 62/62 [06:45<00:00,  6.54s/it]


Best threshold: 0.6600000000000004
Validation Metrics - Epoch 3/3:
Accuracy:  0.9316
Precision: 0.9013
Recall:    0.9695
F1-score:  0.9341
ROC-AUC:   0.9655
Confusion Matrix:
1553058 48918
170082 1431894


In [11]:
torch.save(model.state_dict(), "./model_2020_3_0.1.pth")