# GconvGRU

Chaque modelName va correspondre à un graph à un certain temps

DynamicGraphTemporalSignal:  graph qui  peut évoluer (+ ou - de noeuds / edge)

## sans visualisation graphique 

In [None]:
import torch
import numpy as np
import torch.nn.functional as F
from torch_geometric_temporal.signal import DynamicGraphTemporalSignal
from torch_geometric_temporal.nn.recurrent import GConvGRU
import matplotlib.pyplot as plt


from torch.nn import L1Loss
mae_fn = L1Loss()

# Création du dataset dynamique
edge_indices = [
    np.array([[0, 1], [1, 2]]).T,
    np.array([[0, 2], [2, 1]]).T,
    np.array([[1, 0], [2, 0]]).T,
]

edge_weights = [
    np.array([1.0, 2.0]),
    np.array([0.5, 1.5]),
    np.array([2.0, 1.0])
]

features = [
    np.array([[1, 0], [0, 1], [1, 1]]),
    np.array([[2, 0], [1, 1], [0, 1]]),
    np.array([[1, 1], [2, 1], [1, 0]]),
]

targets = [
    np.array([0.1, 0.2, 0.3]),
    np.array([0.2, 0.1, 0.4]),
    np.array([0.3, 0.2, 0.5]),
]

dataset = DynamicGraphTemporalSignal(
    edge_indices=edge_indices,
    edge_weights=edge_weights,
    features=features,
    targets=targets,
)

# Modèle avec GConvGRU
class GConvGRUModel(torch.nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.recurrent = GConvGRU(
            in_channels=in_channels,
            out_channels=out_channels,
            K=2  
        )
        self.linear = torch.nn.Linear(out_channels, 1)

    def forward(self, x, edge_index, edge_weight, h):
        h = self.recurrent(x, edge_index, edge_weight, h)
        out = self.linear(F.relu(h))
        return out, h

# Initialisation
model = GConvGRUModel(in_channels=2, out_channels=4)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = torch.nn.MSELoss()

# Entraînement
model.train()
for epoch in range(300):
    loss_total = 0
    mae_total = 0
    h = None
    for snapshot in dataset:
        x = torch.FloatTensor(snapshot.x)
        y = torch.FloatTensor(snapshot.y).view(-1, 1)
        edge_index = torch.LongTensor(snapshot.edge_index.T)
        edge_weight = (
            torch.FloatTensor(snapshot.edge_weight)
            if snapshot.edge_weight is not None
            else torch.ones(edge_index.shape[1])
        )

        if h is None:
            h = torch.zeros(x.size(0), 4)  

        optimizer.zero_grad()
        out, h = model(x, edge_index, edge_weight, h)
        loss = loss_fn(out, y)
        loss.backward(retain_graph=True )
        optimizer.step()
        if h is not None:
            h = h.detach()
        loss_total += loss.item()
        mae_total += mae_fn(out, y).item()
    print(f"Epoch {epoch+1}, Loss: {loss_total:.4f}, MAE: {mae_total:.4f}")

# Prédiction
print("\n Predictions :")
model.eval()
h = None
for snapshot in dataset:
    x = torch.FloatTensor(snapshot.x)
    y = torch.FloatTensor(snapshot.y)
    edge_index = torch.LongTensor(snapshot.edge_index.T)
    #edge_weight = torch.FloatTensor(snapshot.edge_weight)
    edge_weight = (
        torch.FloatTensor(snapshot.edge_weight)
        if snapshot.edge_weight is not None
        else torch.ones(edge_index.shape[1])
    )

    out, h = model(x, edge_index, edge_weight, h)
    pred = out.view(-1).detach().numpy()
    real = y.numpy()
    print(f"Prédiction : {pred}")
    print(f"Réel      : {real}\n")
    #print("Prediction:", out.view(-1).detach().numpy())
    for i, (p, r) in enumerate(zip(pred, real)):
        print(f"Noeud {i} -> Prédiction: {p:.3f}, Réel: {r:.3f}")
        print()



## avec matplotlib (visualisation graphique données réelles VS prédictions)

In [None]:
import torch
import numpy as np
import torch.nn.functional as F
from torch_geometric_temporal.signal import DynamicGraphTemporalSignal
from torch_geometric_temporal.nn.recurrent import GConvGRU
import matplotlib.pyplot as plt


from torch.nn import L1Loss
mae_fn = L1Loss()

# Création du dataset dynamique
edge_indices = [
    np.array([[0, 1], [1, 2]]).T,
    np.array([[0, 2], [2, 1]]).T,
    np.array([[1, 0], [2, 0]]).T,
]

edge_weights = [
    np.array([1.0, 2.0]),
    np.array([0.5, 1.5]),
    np.array([2.0, 1.0])
]

features = [
    np.array([[1, 0], [0, 1], [1, 1]]),
    np.array([[2, 0], [1, 1], [0, 1]]),
    np.array([[1, 1], [2, 1], [1, 0]]),
]

targets = [
    np.array([0.1, 0.2, 0.3]),
    np.array([0.2, 0.1, 0.4]),
    np.array([0.3, 0.2, 0.5]),
]

dataset = DynamicGraphTemporalSignal(
    edge_indices=edge_indices,
    edge_weights=edge_weights,
    features=features,
    targets=targets,
)

# Modèle avec GConvGRU
class GConvGRUModel(torch.nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.recurrent = GConvGRU(
            in_channels=in_channels,
            out_channels=out_channels,
            K=2 
        )
        self.linear = torch.nn.Linear(out_channels, 1)

    def forward(self, x, edge_index, edge_weight, h):
        h = self.recurrent(x, edge_index, edge_weight, h)
        out = self.linear(F.relu(h))
        return out, h

# Initialisation
model = GConvGRUModel(in_channels=2, out_channels=4)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = torch.nn.MSELoss()

# Entraînement
model.train()
for epoch in range(300):
    loss_total = 0
    mae_total = 0
    h = None
    for snapshot in dataset:
        x = torch.FloatTensor(snapshot.x)
        y = torch.FloatTensor(snapshot.y).view(-1, 1)
        edge_index = torch.LongTensor(snapshot.edge_index.T)
        edge_weight = (
            torch.FloatTensor(snapshot.edge_weight)
            if snapshot.edge_weight is not None
            else torch.ones(edge_index.shape[1])
        )

        if h is None:
            h = torch.zeros(x.size(0), 4)  

        optimizer.zero_grad()
        out, h = model(x, edge_index, edge_weight, h)
        loss = loss_fn(out, y)
        loss.backward(retain_graph=True )
        optimizer.step()
        if h is not None:
            h = h.detach()
        loss_total += loss.item()
        mae_total += mae_fn(out, y).item()
    print(f"Epoch {epoch+1}, Loss: {loss_total:.4f}, MAE: {mae_total:.4f}")

# Prédiction
print("\n📈 Predictions :")
model.eval()
h = None
for t, snapshot in enumerate(dataset):
    x = torch.FloatTensor(snapshot.x)
    y = torch.FloatTensor(snapshot.y)
    edge_index = torch.LongTensor(snapshot.edge_index.T)
    edge_weight = (
        torch.FloatTensor(snapshot.edge_weight)
        if snapshot.edge_weight is not None
        else torch.ones(edge_index.shape[1])
    )
    out, h = model(x, edge_index, edge_weight, h)
    pred = out.view(-1).detach().numpy()
    real = y.numpy()

    plt.figure(figsize=(6,4))
    nodes = np.arange(len(pred))
    plt.plot(nodes, real, 'o-', label='Valeurs réelles')
    plt.plot(nodes, pred, 'x--', label='Prédictions')
    plt.title(f"Comparaison prédictions vs valeurs réelles (snapshot {t})")
    plt.xlabel("Noeud")
    plt.ylabel("Valeur")
    plt.legend()
    plt.grid(True)
    plt.show()

