In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import dgl
import dgl.function as fn

Using backend: pytorch


# Message passing framework:

![](assets/img/05_dgl_message_passing_1.png) 



Три основных функции:
1. `M` (message): принимает единственный аргумент `edges: EdgeBatch`, содержащий набор ребер (`DGL` неявно от нас разбивает граф на такие наборы). У `edges` есть 3 атрибута: `src` (фичи начальных узлов), `dst` (фичи конечных узлов) и `data` (фичи самих ребер)
2. $\sum$ (reduce):  принимает единственный аргумент `nodes: NodeBatch`, содержащий набор узлов. У `nodes` есть 1 атрибут `mailbox` для доступа к сообщениям, полученным от других узлов
3. `U` (update): тоже принимает единственный аргумент `nodes: NodeBatch`. Эта функция оперирует с агрегатов из $\sum$, обычно сочетая его с исходным представлением узла для генерации и сохранения нового представления

`DGL` реализует необходимый минимум в message и reduce функций в модуле `dql.function`. Если этого не хватает, можно определить и собственные функции.

Встроенные функции сообщений могут быть унарными или бинарными. Унарная функция - это `copy`; бинарные - `add`, `sub`, `mul`, `div` и `dot`. Далее в названиях идут буквы согласно соглашению: `u` - для `src` узлов; `v` для `dst` и `e` для `edges`.

Встроенные функции свертки: `sum`, `max`, `min`, `mean`. 


Замечание: на практике наблюдаю, что стандартный обход соседей подразумевает входящие связи (in-neighbors)

In [None]:
# fn.u_add_v('hu', 'hv', 'he')
def u_add_v(edges):
    return {'he': edges.src['hu'] + edges.dst['hv']}

# fn.sum('m', 'h')
def sum_(nodes):
    return {'h': nodes.mailbox['m'].sum(dim=1)}

Если необходимо провести расчеты на ребрах без рассылки сообщений, то можно использовать метод `apply_edges`. 

```
G.apply_edges(fn.u_add_v('el', 'er', 'e'))
```

`update_all` - это верхнеуровневое API, объединяющее генерацию сообщений, агрегацию сообщений и обновление узлов. 

`update_all` может принимать 3 аргумента: функцию сообщения, функцию свертки и функцию обновления. 

В целях улучшения читаемости кода авторы рекомендуют не указывать функцию обновления здесь, а вызвать ее отдельно, т.к. обычно она состоит из простого присваивания и так легче понять, что происходит.

```
# сообщение m: перемножить фичи ft начальных узлов и фичами a на ребрах
# свертка: суммировать сообщения m и сохранить результат в .ndata['ft']
G.update_all(fn.u_mul_e('ft', 'a', 'm'),
                    fn.sum('m', 'ft'))
# вызвать функцию обновления отдельно
G.data['final_ft'] = G.ndata['ft'] * 2
```

Замечания по эффективности:
1. Лучший вариант - использовать встроенные функции
2. Обычно связей намного больше, чем узлов, так что чем меньше сообщений хранится на связях, тем лучше. 

Пример: вместо операции $W \times (u||v)$ лучше использовать операции $W_l \times u + W_r \times v$, которые эквивалентны, но на ребрах не приходится хранить длинный вектор $(u||v)$


Если требуется обновить только часть узлов, следует создать подграф на основе этих узлов и применить `update_all` к нему.

```
nid = [0, 2, 3, 6, 7, 9]
sg = g.subgraph(nid)
sg.update_all(message_func, reduce_func)
```

MPF может быть применен и к гетерографам по следующему принципу:
1. Для каждого отношения выполнить расчет сообщений и агрегацию
2. Для каждого типа узла объединить результаты, полученные на различных отношениях

`multi_update_all` принимает на вход словарь с параметрами для `update_all` на каждое отношение и "cross type" функцию свертки

В случае гетерографа методу `apply_edges` можно передать тип ребер, для которых требуется выполнить вычисления:
```
G.apply_edges(fn.u_dot_v('h', 'h', 'score'), etype=etype)
```

# Реализация GraphSage на DGL:

![](assets/img/05_dgl_message_passing_2.png) 

In [3]:
class SAGEConv(nn.Module):
    def __init__(self, n_inputs, n_outputs):
        super().__init__()
        self.linear = nn.Linear(2 * n_inputs, n_outputs)

    def forward(self, G, h):
        # G.local_scope() означает, что любые out-place изменения фичей узлов или ребер
        # не будут видны за пределами контекста
        # (inplace операции будут отражены за пределами контекста!)
        with G.local_scope():
            G.ndata['h'] = h
            # 1 строка SAGE
            # update_all делает описываемые операции для всех узлов/ребер
            G.update_all(message_func=fn.copy_u('h', 'm'),  # фичи h -> сообщения m
                         reduce_func=fn.mean('m', 'h_N')) # среднее по сообщениям m -> h_N
            h_N = G.ndata['h_N']
            # 2 строка SAGE
            h_total = torch.cat([h, h_N], dim=1) 
            return self.linear(h_total)

Протестируем наш модуль (код взят из 03_dgl_node_classification; вместе `gnn.GraphConv` используем кастомный `SAGEConv`)

In [4]:
class GCN(nn.Module):
    def __init__(self, n_input, n_hidden, n_output):
        super().__init__()
        self.conv1 = SAGEConv(n_input, n_hidden)
        self.conv2 = SAGEConv(n_hidden, n_output)

    def forward(self, G, in_features):
        out = F.relu(self.conv1(G, in_features))
        out = self.conv2(G, out)
        return out


In [5]:
from utils import train_cora_node_classification

In [6]:
dataset = dgl.data.CoraGraphDataset()
G = dataset[0]

n_input = G.ndata['feat'].shape[1]
n_hidden = 16
n_out = dataset.num_classes
n_epochs = 100

model = GCN(n_input, n_hidden, n_out)
train_cora_node_classification(model, G)

  NumNodes: 2708
  NumEdges: 10556
  NumFeats: 1433
  NumClasses: 7
  NumTrainingSamples: 140
  NumValidationSamples: 500
  NumTestSamples: 1000
Done loading data from cached files.
In epoch 0, loss: 1.949, val acc: 0.156 (best 0.156), test acc: 0.144 (best 0.144)
In epoch 5, loss: 1.865, val acc: 0.286 (best 0.286), test acc: 0.283 (best 0.283)
In epoch 10, loss: 1.713, val acc: 0.306 (best 0.332), test acc: 0.317 (best 0.338)
In epoch 15, loss: 1.488, val acc: 0.436 (best 0.436), test acc: 0.405 (best 0.405)
In epoch 20, loss: 1.208, val acc: 0.522 (best 0.522), test acc: 0.528 (best 0.528)
In epoch 25, loss: 0.903, val acc: 0.626 (best 0.626), test acc: 0.609 (best 0.609)
In epoch 30, loss: 0.616, val acc: 0.684 (best 0.684), test acc: 0.706 (best 0.706)
In epoch 35, loss: 0.384, val acc: 0.758 (best 0.758), test acc: 0.762 (best 0.762)
In epoch 40, loss: 0.228, val acc: 0.772 (best 0.774), test acc: 0.774 (best 0.772)
In epoch 45, loss: 0.134, val acc: 0.770 (best 0.774), test acc:

In [23]:
class WeightedSAGEConv(nn.Module):
    def __init__(self, n_inputs, n_outputs):
        super().__init__()
        self.linear = nn.Linear(2 * n_inputs, n_outputs)

    def forward(self, G, h):
        # G.local_scope() означает, что любые out-place изменения фичей узлов или ребер
        # не будут видны за пределами контекста
        # (inplace операции будут отражены за пределами контекста!)
        with G.local_scope():
            G.ndata['h'] = h
            G.edata['w'] = G.edata['weight']
            # 1 строка SAGE
            # update_all делает описываемые операции для всех узлов/ребер
            G.update_all(message_func=fn.u_mul_e('h', 'w', 'm'),  # фичи h * веса входящих ребер w -> сообщения m
                         reduce_func=fn.mean('m', 'h_N'))  # среднее по сообщениям m -> h_N

            # мне не нравится, что так нет нормализации по весам
            # # сумма весов входящих ребер
            # G.update_all(fn.copy_e('w', 'm'), fn.sum('m', 'W'))
            # # получение нормализованных весов
            # # второй вариант эквивалентен первому
            # # g.apply_edges(lambda edges: {'w1': edges.data['w'] / edges.dst['M']})
            # G.apply_edges(fn.e_div_v('w', 'W', 'w_norm'))
            # # усреднение по соседям с использованием нормализованных весов
            # G.update_all(fn.u_mul_e('h', 'w_norm', 'm'), fn.sum('m', 'h_N'))
            # но вообще-то оно и без этого нормально работает
            h_N = G.ndata['h_N']
            # 2 строка SAGE
            h_total = torch.cat([h, h_N], dim=1)
            return self.linear(h_total)


class GCN(nn.Module):
    def __init__(self, n_input, n_hidden, n_output):
        super().__init__()
        self.conv1 = WeightedSAGEConv(n_input, n_hidden)
        self.conv2 = WeightedSAGEConv(n_hidden, n_output)

    def forward(self, G, in_features):
        out = F.relu(self.conv1(G, in_features))
        out = self.conv2(G, out)
        return out



In [24]:
dataset = dgl.data.CoraGraphDataset()
G = dataset[0]
# добавили вес ребер
G.edata['weight'] = torch.ones((G.num_edges(), 1))

n_input = G.ndata['feat'].shape[1]
n_hidden = 16
n_out = dataset.num_classes
n_epochs = 100


model = GCN(n_input, n_hidden, n_out)
train_cora_node_classification(model, G)


  NumNodes: 2708
  NumEdges: 10556
  NumFeats: 1433
  NumClasses: 7
  NumTrainingSamples: 140
  NumValidationSamples: 500
  NumTestSamples: 1000
Done loading data from cached files.
In epoch 0, loss: 1.950, val acc: 0.122 (best 0.122), test acc: 0.130 (best 0.130)
In epoch 5, loss: 1.873, val acc: 0.444 (best 0.444), test acc: 0.408 (best 0.408)
In epoch 10, loss: 1.720, val acc: 0.438 (best 0.444), test acc: 0.419 (best 0.408)
In epoch 15, loss: 1.490, val acc: 0.502 (best 0.502), test acc: 0.469 (best 0.469)
In epoch 20, loss: 1.200, val acc: 0.568 (best 0.568), test acc: 0.527 (best 0.527)
In epoch 25, loss: 0.890, val acc: 0.640 (best 0.640), test acc: 0.618 (best 0.618)
In epoch 30, loss: 0.604, val acc: 0.718 (best 0.718), test acc: 0.707 (best 0.707)
In epoch 35, loss: 0.377, val acc: 0.758 (best 0.758), test acc: 0.734 (best 0.734)
In epoch 40, loss: 0.223, val acc: 0.758 (best 0.758), test acc: 0.757 (best 0.734)
In epoch 45, loss: 0.130, val acc: 0.758 (best 0.758), test acc:

Пример усреднения фичей соседей с учетом нормализованны весов

In [91]:
g = dgl.graph(([0, 1, 2, 3, 4], [1, 2, 3, 4, 2]))
g.ndata['x'] = torch.arange(g.num_nodes()*2).reshape(5, 2).float()
g.edata['w'] = torch.arange(1, g.num_edges()+1).reshape(-1, 1).float()

# сумма весов входящих ребер
g.update_all(fn.copy_e('w', 'm'), fn.sum('m', 'W'))
# получение нормализованных весов
# второй вариант эквивалентен первому
# g.apply_edges(lambda edges: {'w1': edges.data['w'] / edges.dst['M']})
g.apply_edges(fn.e_div_v('w', 'W', 'w_norm'))
# усреднение по соседям с использованием нормализованных весов
g.update_all(fn.u_mul_e('x', 'w_norm', 'm'), fn.sum('m', 'h_N'))
