In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [2]:
class AVWGCN(nn.Module):
    def __init__(self, dim_in, dim_out, cheb_k, embed_dim):
        super(AVWGCN, self).__init__()
        self.cheb_k = cheb_k  # Chebyshev卷积的阶数
        self.weights_pool = nn.Parameter(torch.FloatTensor(embed_dim, cheb_k, dim_in, dim_out))  # 学习得到的权重参数，用于Chebyshev卷积
        self.bias_pool = nn.Parameter(torch.FloatTensor(embed_dim, dim_out))  # 学习得到的偏置参数，用于Chebyshev卷积

    def forward(self, x, node_embeddings):
        # x的形状为 [B, N, C]，表示输入的节点特征；node_embeddings的形状为 [N, D]，表示节点的嵌入特征
        # supports的形状为 [N, N]，表示节点之间的邻接关系
        node_num = node_embeddings.shape[0]  # 节点的数量
        supports = F.softmax(F.relu(torch.mm(node_embeddings, node_embeddings.transpose(0, 1))), dim=1)  # 计算节点之间的邻接关系
        support_set = [torch.eye(node_num).to(supports.device), supports]  # 将自连接和计算得到的邻接关系添加到支持集中

        # 计算Chebyshev多项式的值，形成支持集
        for k in range(2, self.cheb_k):
            support_set.append(torch.matmul(2 * supports, support_set[-1]) - support_set[-2])
        supports = torch.stack(support_set, dim=0)

        # 计算权重和偏置
        weights = torch.einsum('nd,dkio->nkio', node_embeddings, self.weights_pool)  # 计算权重
        bias = torch.matmul(node_embeddings, self.bias_pool)  # 计算偏置

        # 执行Chebyshev卷积操作
        x_g = torch.einsum("knm,bmc->bknc", supports, x)  # 将输入节点特征与邻接关系相乘，得到支持集
        x_g = x_g.permute(0, 2, 1, 3)  # 调整维度顺序
        x_gconv = torch.einsum('bnki,nkio->bno', x_g, weights) + bias  # 执行Chebyshev卷积操作并加上偏置
        return x_gconv  # 返回Chebyshev卷积的结果

In [3]:
# 假设输入的节点特征维度为3，输出的节点特征维度为4，Chebyshev卷积的阶数为3，节点嵌入特征维度为5
dim_in = 3
dim_out = 4
cheb_k = 3
embed_dim = 5

# 创建AVWGCN模型实例
model = AVWGCN(dim_in, dim_out, cheb_k, embed_dim)

# 生成测试数据
# 假设有2个样本，每个样本包含5个节点，每个节点有3维特征
x = torch.randn(2, 5, 3)
# 假设节点嵌入特征维度为5
node_embeddings = torch.randn(5, 5)

# 执行前向传播
output = model(x, node_embeddings)

# 打印输出结果的形状
print("输出结果形状:", output.shape)

输出结果形状: torch.Size([2, 5, 4])


In [9]:
class AGCRNCell(nn.Module):
    def __init__(self, node_num, dim_in, dim_out, cheb_k, embed_dim):
        super(AGCRNCell, self).__init__()
        self.node_num = node_num  # 节点数量
        self.hidden_dim = dim_out  # 隐藏状态维度
        self.gate = AVWGCN(dim_in+self.hidden_dim, 2*dim_out, cheb_k, embed_dim)  # 更新门和重置门
        self.update = AVWGCN(dim_in+self.hidden_dim, dim_out, cheb_k, embed_dim)  # 候选状态更新

    def forward(self, x, state, node_embeddings):
        # x的形状为 [B, num_nodes, input_dim]
        # state的形状为 [B, num_nodes, hidden_dim]
        state = state.to(x.device)
        input_and_state = torch.cat((x, state), dim=-1)  # 将输入和隐藏状态连接起来
        z_r = torch.sigmoid(self.gate(input_and_state, node_embeddings))  # 计算更新门和重置门的值
        z, r = torch.split(z_r, self.hidden_dim, dim=-1)  # 将输出切分为更新门和重置门
        candidate = torch.cat((x, z*state), dim=-1)  # 计算候选状态
        hc = torch.tanh(self.update(candidate, node_embeddings))  # 更新候选状态
        h = r*state + (1-r)*hc  # 计算最终的隐藏状态
        return h  # 返回隐藏状态 -> [B, num_nodes, hidden_dim/dim_out]

    def init_hidden_state(self, batch_size):
        # 初始化隐藏状态
        return torch.zeros(batch_size, self.node_num, self.hidden_dim)


In [10]:
# 假设节点数量为5，输入特征维度为3，输出特征维度为4，Chebyshev卷积的阶数为3，节点嵌入特征维度为5
node_num = 5
dim_in = 3
dim_out = 4
cheb_k = 3
embed_dim = 5

# 创建AGCRNCell模型实例
model = AGCRNCell(node_num, dim_in, dim_out, cheb_k, embed_dim)

# 生成测试数据
# 假设有2个样本，每个样本包含5个节点，每个节点有3维特征
x = torch.randn(2, 5, 3)
# 初始隐藏状态，假设有2个样本，每个样本包含5个节点，每个节点有4维隐藏状态
state = model.init_hidden_state(2)
# 假设节点嵌入特征维度为5
node_embeddings = torch.randn(5, 5)

# 执行前向传播
output = model(x, state, node_embeddings)

# 打印输出结果的形状
print("输出结果形状:", output.shape)

输出结果形状: torch.Size([2, 5, 4])


In [12]:
class AVWDCRNN(nn.Module):
    def __init__(self, node_num, dim_in, dim_out, cheb_k, embed_dim, num_layers=1):
        super(AVWDCRNN, self).__init__()
        assert num_layers >= 1, 'At least one DCRNN layer in the Encoder.'
        self.node_num = node_num  # 节点数量
        self.input_dim = dim_in  # 输入特征维度
        self.num_layers = num_layers  # DCRNN层数
        self.dcrnn_cells = nn.ModuleList()
        self.dcrnn_cells.append(AGCRNCell(node_num, dim_in, dim_out, cheb_k, embed_dim))  # 第一个DCRNN层
        for _ in range(1, num_layers):
            self.dcrnn_cells.append(AGCRNCell(node_num, dim_out, dim_out, cheb_k, embed_dim))  # 其他DCRNN层

    def forward(self, x, init_state, node_embeddings):
        # x形状: (B, T, N, D)
        # init_state形状: (num_layers, B, N, hidden_dim)
        assert x.shape[2] == self.node_num and x.shape[3] == self.input_dim
        seq_length = x.shape[1]
        current_inputs = x
        output_hidden = []
        for i in range(self.num_layers):
            state = init_state[i]
            inner_states = []
            for t in range(seq_length):
                state = self.dcrnn_cells[i](current_inputs[:, t, :, :], state, node_embeddings)
                inner_states.append(state)
            output_hidden.append(state)
            current_inputs = torch.stack(inner_states, dim=1)
        # current_inputs: 最后一层的输出: (B, T, N, hidden_dim)
        # output_hidden: 每层的最后状态: (num_layers, B, N, hidden_dim)
        # last_state: (B, N, hidden_dim)
        return current_inputs, output_hidden

    def init_hidden(self, batch_size):
        # 初始化隐藏状态
        init_states = []
        for i in range(self.num_layers):
            init_states.append(self.dcrnn_cells[i].init_hidden_state(batch_size))
        return torch.stack(init_states, dim=0)  # (num_layers, B, N, hidden_dim)


In [13]:
# 假设节点数量为5，输入特征维度为3，输出特征维度为4，Chebyshev卷积的阶数为3，节点嵌入特征维度为5，DCRNN层数为2
node_num = 5
dim_in = 3
dim_out = 4
cheb_k = 3
embed_dim = 5
num_layers = 2

# 创建AVWDCRNN模型实例
model = AVWDCRNN(node_num, dim_in, dim_out, cheb_k, embed_dim, num_layers)

# 生成测试数据
# 假设有2个样本，每个样本包含10个时间步，每个时间步有5个节点，每个节点有3维特征
x = torch.randn(2, 10, 5, 3)
# 初始隐藏状态
init_state = model.init_hidden(2)
# 假设节点嵌入特征维度为5
node_embeddings = torch.randn(5, 5)

# 执行前向传播
output, output_hidden = model(x, init_state, node_embeddings)

# 打印输出结果的形状
print("输出结果形状:", output.shape)
print("输出隐藏状态形状:", torch.stack(output_hidden, dim=0).shape)

输出结果形状: torch.Size([2, 10, 5, 4])
输出隐藏状态形状: torch.Size([2, 2, 5, 4])


In [15]:
class AGCRN(nn.Module):
    def __init__(self, args):
        super(AGCRN, self).__init__()
        self.num_node = args.num_nodes  # 节点数量
        self.input_dim = args.input_dim  # 输入特征维度
        self.hidden_dim = args.rnn_units  # 隐藏状态维度
        self.output_dim = args.output_dim  # 输出特征维度
        self.horizon = args.horizon  # 预测步长
        self.num_layers = args.num_layers  # 层数

        self.default_graph = args.default_graph  # 默认图结构
        self.node_embeddings = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True)  # 节点嵌入

        self.encoder = AVWDCRNN(args.num_nodes, args.input_dim, args.rnn_units, args.cheb_k,
                                args.embed_dim, args.num_layers)  # 编码器

        # 预测器
        self.end_conv = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True)

    def forward(self, source, targets, teacher_forcing_ratio=0.5):
        # source: B, T_1, N, D
        # targets: B, T_2, N, D

        init_state = self.encoder.init_hidden(source.shape[0])  # 初始化隐藏状态
        output, _ = self.encoder(source, init_state, self.node_embeddings)  # 编码器前向传播，得到输出和隐藏状态
        output = output[:, -1:, :, :]  # 取最后一个时间步的输出，形状为 (B, 1, N, hidden)

        # 基于CNN的预测器
        output = self.end_conv(output)  # 卷积操作，形状为 (B, T*C, N, 1)
        output = output.squeeze(-1).reshape(-1, self.horizon, self.output_dim, self.num_node)  # 调整形状
        output = output.permute(0, 1, 3, 2)  # 形状调整为 (B, T, N, C)

        return output  # 返回预测结果

In [16]:
# 定义参数类
class Args:
    num_nodes = 5
    input_dim = 3
    rnn_units = 4
    output_dim = 4
    horizon = 10
    num_layers = 2
    cheb_k = 3
    embed_dim = 5
    default_graph = True

# 创建参数实例
args = Args()

# 创建AGCRN模型实例
model = AGCRN(args)

# 生成测试数据
# 假设有2个样本，每个样本包含10个时间步，每个时间步有5个节点，每个节点有3维特征
source = torch.randn(2, 10, 5, 3)
# 目标数据，假设有2个样本，每个样本包含10个时间步，每个时间步有5个节点，每个节点有2维特征
targets = torch.randn(2, 10, 5, 2)

# 执行前向传播
output = model(source, targets)

# 打印输出结果的形状
print("输出结果形状:", output.shape)

输出结果形状: torch.Size([2, 10, 5, 4])
