In [1]:
from torch.utils.data import Dataset, DataLoader
import torch
import dgl
class chunkDataset(Dataset): #[node_num, T, else]
    """
           初始化函数，用于构建图数据结构并存储相关数据。

           Args:
               chunks (dict): 包含多个数据块的字典，每个数据块包含日志、指标、追踪信息以及对应的错误标签。
               node_num (int): 图中节点的数量。
               edges (tuple): 图的边信息，包含两个列表，分别表示边的源节点和目标节点。
           """
    def __init__(self, chunks, node_num, edges):
        # 存储图数据及其对应的错误标签
        self.data = []
        # 用于将索引映射到数据块ID的字典
        self.idx2id = {}
        # 遍历chunks字典，构建图数据结构并存储相关信息
        for idx, chunk_id in enumerate(chunks.keys()):
            # 将索引映射到数据块ID
            self.idx2id[idx] = chunk_id
            chunk = chunks[chunk_id]
            # 使用DGL库创建有向图，并设置节点特征，edges[0] 和 edges[1] 分别表示图中的源节点和目标节点。0->1
            graph = dgl.graph((edges[0], edges[1]), num_nodes=node_num)
            # 设置节点的日志特征，graph.ndata 是一个字典，用于存储图中节点的特征数据。每个键对应一个特征名称，值是一个张量（tensor），表示所有节点在该特征上的值
            # torch.FloatTensor 是 PyTorch 中的一个函数，用于将输入数据转换为浮点型张量（tensor）。
            graph.ndata["logs"] = torch.FloatTensor(chunk["logs"])
            # 设置节点的指标特征
            graph.ndata["metrics"] = torch.FloatTensor(chunk["metrics"])
            # 设置节点的追踪特征
            graph.ndata["traces"] = torch.FloatTensor(chunk["traces"])
            # 将图及其对应的错误节点存储到data列表中
            # 如果 chunk["culprit"] 为 -1，表示该数据块中没有故障节点。
            # 否则，chunk["culprit"] 表示故障节点的索引（从 0 开始）
            # 这样做的目的是将每个数据块的图结构和对应的标签组合在一起，形成一个完整的数据项，方便后续的数据加载和处理。
            self.data.append((graph, chunk["culprit"]))

    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data[idx]
    def __get_chunk_id__(self, idx):
        return self.idx2id[idx]

from utils import *
from base import BaseModel

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--random_seed", default=42, type=int)

### Training params
parser.add_argument("--gpu", default=True, type=lambda x: x.lower() == "true")
parser.add_argument("--epoches", default=70, type=int)
parser.add_argument("--batch_size", default=256, type=int)
parser.add_argument("--lr", default=0.001, type=float)
parser.add_argument("--patience", default=10, type=int)
parser.add_argument("--node_feat_dim", default=64, type=int)

##### Fuse params
parser.add_argument("--self_attn", default=True, type=lambda x: x.lower() == "true")
# parser.add_argument("--self_attn", default=False, type=lambda x: x.lower() == "tru e")
parser.add_argument("--fuse_dim", default=128, type=int)
parser.add_argument("--alpha", default=0.5, type=float)
parser.add_argument("--beta", default=0.1, type=float)
parser.add_argument("--locate_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--detect_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--detector_rank", default=16, type=int)
parser.add_argument("--locator_rank", default=16, type=int)

##### Source params
parser.add_argument("--log_dim", default=16, type=int)
parser.add_argument("--trace_kernel_sizes", default=[2], type=int, nargs='+')
parser.add_argument("--trace_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--metric_kernel_sizes", default=[2], type=int, nargs='+')
parser.add_argument("--metric_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--graph_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--attn_head", default=4, type=int, help="For gat or gat-v2")
parser.add_argument("--activation", default=0.2, type=float, help="use LeakyReLU, shoule be in (0,1)")

##### TimesNet-specific params
parser.add_argument("--seq_len", default=None, type=int, help="Input sequence length for TimesNet")
parser.add_argument('--label_len', type=int, default=48, help='start token length')
parser.add_argument("--pred_len", default=0, type=int, help="Prediction length for TimesNet")
parser.add_argument("--enc_in", default=2, type=int, help="Input feature dimension for TimesNet")
parser.add_argument('--dropout', type=float, default=0.1, help='dropout')
parser.add_argument('--embed', type=str, default='timeF', help='time features encoding, options:[timeF, fixed, learned]')
parser.add_argument('--freq', type=str, default='h', help='freq for time features encoding, options:[s:secondly, t:minutely, h:hourly, d:daily, b:business days, w:weekly, m:monthly], you can also use more detailed freq like 15min or 3h')
parser.add_argument("--c_out", default=2, type=int, help="Output feature dimension for TimesNet")
parser.add_argument("--d_model", default=2, type=int, help="Model hidden dimension for TimesNet")
parser.add_argument("--d_ff", default=256, type=int, help="Feed-forward network dimension for TimesNet")
parser.add_argument("--num_kernels", default=3, type=int, help="Number of convolutional kernels for TimesNet")
parser.add_argument("--e_layers", default=2, type=int, help="Number of TimesNet layers")
parser.add_argument("--task_name", default="anomaly_detection", type=str, help="Task type for TimesNet")
parser.add_argument("--top_k", default=3, type=int, help="Top-k frequencies to extract in FFT")


##### Data params
parser.add_argument("--data", type=str, default="SN")
parser.add_argument("--result_dir", default="../result/")

### add_module
parser.add_argument("--use_transformer", default=True, type=lambda x: x.lower() == "true", help="Use TransformerEncoder for TraceModel and MetricModel")
parser.add_argument("--use_CGLU", default=True, type=lambda x: x.lower() == "true", help="Use CGLU for MultsourceEncoder")
parser.add_argument("--use_TraceDifussion", default=True, type=lambda x: x.lower() == "true", help="Use CGLU for GraphModel")
parser.add_argument("--use_crossModalAttenion", default=False, type=lambda x: x.lower() == "true", help="Use crossModalAttenion for MultsourceEncoder")
parser.add_argument("--use_EvolveGCN", default=False, type=lambda x: x.lower() == "true", help="Use use_EvolveGCN for GraphModel")

args, unknown_args = parser.parse_known_args()  # 替换 your_dataset_name
params = vars(args)  # 正确操作：将命名空间对象转换为 dict

import logging
def get_device(gpu):
    if gpu and torch.cuda.is_available():
        logging.info("Using GPU...")
        return torch.device("cuda")
    logging.info("Using CPU...")
    return torch.device("cpu")
    

def collate(data):
    """
       对数据进行整理，将图形数据和标签分离并批处理。

       参数:
       - data: 一个列表，其中包含多个元组，每个元组包含一个图和对应的标签。

       返回:
       - batched_graph: 批处理后的图数据。
       - torch.tensor(labels): 标签的张量表示。
       """
    graphs, labels = map(list, zip(*data))
    batched_graph = dgl.batch(graphs)
    return batched_graph , torch.tensor(labels)

def run(evaluation_epoch=10):
    # 设定数据路径
    data_dir = os.path.join("./chunks", params["data"])
    # 从json文件中加载元数据
    metadata = read_json(os.path.join(data_dir, "metadata.json"))
    # 从元数据中获取事件数量、节点数量和指标数量、边信息
    event_num, node_num, metric_num =  metadata["event_num"], metadata["node_num"], metadata["metric_num"]
    edges = metadata["edges"]
    # 读取元数据中chunk长度并设置进参数中
    params["chunk_lenth"] = metadata["chunk_lenth"]

    ############# 如果未指定 seq_len，则使用 chunk_lenth #########
    if params["seq_len"] is None:
        params["seq_len"] = params["chunk_lenth"]

    # 生成实验唯一标识并配置运行环境
    # 包含参数哈希化、随机种子固定、GPU设备选择等初始化操作
    hash_id = dump_params(params)
    params["hash_id"] = hash_id
    seed_everything(params["random_seed"])
    device = get_device(params["gpu"])

    #加载预处理后的数据块
    train_chunks, test_chunks = load_chunks(data_dir)
    # 构建数据集和数据加载器
    # 使用自定义的chunkDataset处理图结构数据
    train_data = chunkDataset(train_chunks, node_num, edges)
    test_data = chunkDataset(test_chunks, node_num, edges)
    # 动态计算总特征维度
    graph_example = train_data[0][0]  # 获取第一个图样本
    feature_keys = ['logs', 'metrics', 'traces']
    total_node_feat_dim = sum([
        graph_example.ndata[key].view(graph_example.ndata[key].size(0), -1).size(1)  # 展平后计算维度
        for key in feature_keys
    ])
    params["node_feat_dim"] = total_node_feat_dim  # 设置总特征维度


    train_dl = DataLoader(train_data, batch_size=params["batch_size"], shuffle=True, collate_fn=collate, pin_memory=True)
    test_dl = DataLoader(test_data, batch_size=params["batch_size"], shuffle=True, collate_fn=collate, pin_memory=True)
    # 初始化基础模型并启动训练流程
    model = BaseModel(event_num, metric_num, node_num, device, **params)
    # fit方法返回评估指标得分和收敛状
    scores, converge = model.fit(train_dl, test_dl, evaluation_epoch=evaluation_epoch)
    module_info = f"Transformer: {params['use_transformer']}, " \
                  f"TraceDifussion: {params['use_TraceDifussion']}, " \
                  f"CGLU: {params['use_CGLU']}, " \
                  f"crossModalAttenion: { params['use_crossModalAttenion']}, " \
                  f"EvolveGCN: {params['use_EvolveGCN']} "


        # 将实验结果保存到指定目录下，并记录实验唯一标识
    dump_scores(params["result_dir"], hash_id, scores, converge, params["data"], params["epoches"], params["lr"], params["gpu"], module_info)
    logging.info("Current hash_id {}".format(hash_id))

if "__main__" == __name__:
    run()

2025-04-25 15:58:40,736 P20988 INFO Using GPU...
2025-04-25 15:58:40,737 P20988 INFO Load from ./chunks\SN
2025-04-25 15:58:46,934 P20988 INFO Model Parameters: hash_id=b52b1a7d, data=SN, device=cuda, lr=0.001, epoches=70
2025-04-25 15:58:54,830 P20988 INFO Epoch 1/70, training loss: 1.39945 [7.89s]
2025-04-25 15:58:58,060 P20988 INFO Epoch 2/70, training loss: 0.81892 [3.23s]
2025-04-25 15:59:01,316 P20988 INFO Epoch 3/70, training loss: 0.24630 [3.25s]
2025-04-25 15:59:04,627 P20988 INFO Epoch 4/70, training loss: 0.15544 [3.31s]
2025-04-25 15:59:07,931 P20988 INFO Epoch 5/70, training loss: 0.09936 [3.30s]
2025-04-25 15:59:11,257 P20988 INFO Epoch 6/70, training loss: 0.06911 [3.32s]
2025-04-25 15:59:14,555 P20988 INFO Epoch 7/70, training loss: 0.04737 [3.30s]
2025-04-25 15:59:17,827 P20988 INFO Epoch 8/70, training loss: 0.04054 [3.27s]
2025-04-25 15:59:21,120 P20988 INFO Epoch 9/70, training loss: 0.03548 [3.29s]
2025-04-25 15:59:27,244 P20988 INFO  testing loss: 0.03953 
2025-04

In [5]:
from torch.utils.data import Dataset, DataLoader
import torch
import dgl
class chunkDataset(Dataset): #[node_num, T, else]
    """
           初始化函数，用于构建图数据结构并存储相关数据。

           Args:
               chunks (dict): 包含多个数据块的字典，每个数据块包含日志、指标、追踪信息以及对应的错误标签。
               node_num (int): 图中节点的数量。
               edges (tuple): 图的边信息，包含两个列表，分别表示边的源节点和目标节点。
           """
    def __init__(self, chunks, node_num, edges):
        # 存储图数据及其对应的错误标签
        self.data = []
        # 用于将索引映射到数据块ID的字典
        self.idx2id = {}
        # 遍历chunks字典，构建图数据结构并存储相关信息
        for idx, chunk_id in enumerate(chunks.keys()):
            # 将索引映射到数据块ID
            self.idx2id[idx] = chunk_id
            chunk = chunks[chunk_id]
            # 使用DGL库创建有向图，并设置节点特征，edges[0] 和 edges[1] 分别表示图中的源节点和目标节点。0->1
            graph = dgl.graph((edges[0], edges[1]), num_nodes=node_num)
            # 设置节点的日志特征，graph.ndata 是一个字典，用于存储图中节点的特征数据。每个键对应一个特征名称，值是一个张量（tensor），表示所有节点在该特征上的值
            # torch.FloatTensor 是 PyTorch 中的一个函数，用于将输入数据转换为浮点型张量（tensor）。
            graph.ndata["logs"] = torch.FloatTensor(chunk["logs"])
            # 设置节点的指标特征
            graph.ndata["metrics"] = torch.FloatTensor(chunk["metrics"])
            # 设置节点的追踪特征
            graph.ndata["traces"] = torch.FloatTensor(chunk["traces"])
            # 将图及其对应的错误节点存储到data列表中
            # 如果 chunk["culprit"] 为 -1，表示该数据块中没有故障节点。
            # 否则，chunk["culprit"] 表示故障节点的索引（从 0 开始）
            # 这样做的目的是将每个数据块的图结构和对应的标签组合在一起，形成一个完整的数据项，方便后续的数据加载和处理。
            self.data.append((graph, chunk["culprit"]))

    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data[idx]
    def __get_chunk_id__(self, idx):
        return self.idx2id[idx]

from utils import *
from base import BaseModel

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--random_seed", default=42, type=int)

### Training params
parser.add_argument("--gpu", default=True, type=lambda x: x.lower() == "true")
parser.add_argument("--epoches", default=70, type=int)
parser.add_argument("--batch_size", default=256, type=int)
parser.add_argument("--lr", default=0.001, type=float)
parser.add_argument("--patience", default=10, type=int)
parser.add_argument("--node_feat_dim", default=64, type=int)

##### Fuse params
parser.add_argument("--self_attn", default=True, type=lambda x: x.lower() == "true")
# parser.add_argument("--self_attn", default=False, type=lambda x: x.lower() == "tru e")
parser.add_argument("--fuse_dim", default=128, type=int)
parser.add_argument("--alpha", default=0.5, type=float)
parser.add_argument("--beta", default=0.1, type=float)
parser.add_argument("--locate_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--detect_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--detector_rank", default=16, type=int)
parser.add_argument("--locator_rank", default=16, type=int)

##### Source params
parser.add_argument("--log_dim", default=16, type=int)
parser.add_argument("--trace_kernel_sizes", default=[2], type=int, nargs='+')
parser.add_argument("--trace_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--metric_kernel_sizes", default=[2], type=int, nargs='+')
parser.add_argument("--metric_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--graph_hiddens", default=[64], type=int, nargs='+')
parser.add_argument("--attn_head", default=4, type=int, help="For gat or gat-v2")
parser.add_argument("--activation", default=0.2, type=float, help="use LeakyReLU, shoule be in (0,1)")

##### TimesNet-specific params
parser.add_argument("--seq_len", default=None, type=int, help="Input sequence length for TimesNet")
parser.add_argument('--label_len', type=int, default=48, help='start token length')
parser.add_argument("--pred_len", default=0, type=int, help="Prediction length for TimesNet")
parser.add_argument("--enc_in", default=2, type=int, help="Input feature dimension for TimesNet")
parser.add_argument('--dropout', type=float, default=0.1, help='dropout')
parser.add_argument('--embed', type=str, default='timeF', help='time features encoding, options:[timeF, fixed, learned]')
parser.add_argument('--freq', type=str, default='h', help='freq for time features encoding, options:[s:secondly, t:minutely, h:hourly, d:daily, b:business days, w:weekly, m:monthly], you can also use more detailed freq like 15min or 3h')
parser.add_argument("--c_out", default=2, type=int, help="Output feature dimension for TimesNet")
parser.add_argument("--d_model", default=2, type=int, help="Model hidden dimension for TimesNet")
parser.add_argument("--d_ff", default=256, type=int, help="Feed-forward network dimension for TimesNet")
parser.add_argument("--num_kernels", default=3, type=int, help="Number of convolutional kernels for TimesNet")
parser.add_argument("--e_layers", default=2, type=int, help="Number of TimesNet layers")
parser.add_argument("--task_name", default="anomaly_detection", type=str, help="Task type for TimesNet")
parser.add_argument("--top_k", default=3, type=int, help="Top-k frequencies to extract in FFT")


##### Data params
parser.add_argument("--data", type=str, default="TT")
parser.add_argument("--result_dir", default="../result/")

### add_module
parser.add_argument("--use_transformer", default=True, type=lambda x: x.lower() == "true", help="Use TransformerEncoder for TraceModel and MetricModel")
parser.add_argument("--use_CGLU", default=True, type=lambda x: x.lower() == "true", help="Use CGLU for MultsourceEncoder")
parser.add_argument("--use_TraceDifussion", default=True, type=lambda x: x.lower() == "true", help="Use CGLU for GraphModel")
parser.add_argument("--use_crossModalAttenion", default=False, type=lambda x: x.lower() == "true", help="Use crossModalAttenion for MultsourceEncoder")
parser.add_argument("--use_EvolveGCN", default=False, type=lambda x: x.lower() == "true", help="Use use_EvolveGCN for GraphModel")

args, unknown_args = parser.parse_known_args()  # 替换 your_dataset_name
params = vars(args)  # 正确操作：将命名空间对象转换为 dict

import logging
def get_device(gpu):
    if gpu and torch.cuda.is_available():
        logging.info("Using GPU...")
        return torch.device("cuda")
    logging.info("Using CPU...")
    return torch.device("cpu")
    

def collate(data):
    """
       对数据进行整理，将图形数据和标签分离并批处理。

       参数:
       - data: 一个列表，其中包含多个元组，每个元组包含一个图和对应的标签。

       返回:
       - batched_graph: 批处理后的图数据。
       - torch.tensor(labels): 标签的张量表示。
       """
    graphs, labels = map(list, zip(*data))
    batched_graph = dgl.batch(graphs)
    return batched_graph , torch.tensor(labels)

def run(evaluation_epoch=10):
    # 设定数据路径
    data_dir = os.path.join("./chunks", params["data"])
    # 从json文件中加载元数据
    metadata = read_json(os.path.join(data_dir, "metadata.json"))
    # 从元数据中获取事件数量、节点数量和指标数量、边信息
    event_num, node_num, metric_num =  metadata["event_num"], metadata["node_num"], metadata["metric_num"]
    edges = metadata["edges"]
    # 读取元数据中chunk长度并设置进参数中
    params["chunk_lenth"] = metadata["chunk_lenth"]

    ############# 如果未指定 seq_len，则使用 chunk_lenth #########
    if params["seq_len"] is None:
        params["seq_len"] = params["chunk_lenth"]

    # 生成实验唯一标识并配置运行环境
    # 包含参数哈希化、随机种子固定、GPU设备选择等初始化操作
    hash_id = dump_params(params)
    params["hash_id"] = hash_id
    seed_everything(params["random_seed"])
    device = get_device(params["gpu"])

    #加载预处理后的数据块
    train_chunks, test_chunks = load_chunks(data_dir)
    # 构建数据集和数据加载器
    # 使用自定义的chunkDataset处理图结构数据
    train_data = chunkDataset(train_chunks, node_num, edges)
    test_data = chunkDataset(test_chunks, node_num, edges)
    # 动态计算总特征维度
    graph_example = train_data[0][0]  # 获取第一个图样本
    feature_keys = ['logs', 'metrics', 'traces']
    total_node_feat_dim = sum([
        graph_example.ndata[key].view(graph_example.ndata[key].size(0), -1).size(1)  # 展平后计算维度
        for key in feature_keys
    ])
    params["node_feat_dim"] = total_node_feat_dim  # 设置总特征维度


    train_dl = DataLoader(train_data, batch_size=params["batch_size"], shuffle=True, collate_fn=collate, pin_memory=True)
    test_dl = DataLoader(test_data, batch_size=params["batch_size"], shuffle=True, collate_fn=collate, pin_memory=True)
    # 初始化基础模型并启动训练流程
    model = BaseModel(event_num, metric_num, node_num, device, **params)
    # fit方法返回评估指标得分和收敛状
    scores, converge = model.fit(train_dl, test_dl, evaluation_epoch=evaluation_epoch)
    module_info = f"Transformer: {params['use_transformer']}, " \
                  f"TraceDifussion: {params['use_TraceDifussion']}, " \
                  f"CGLU: {params['use_CGLU']}, " \
                  f"crossModalAttenion: { params['use_crossModalAttenion']}, " \
                  f"EvolveGCN: {params['use_EvolveGCN']} "


        # 将实验结果保存到指定目录下，并记录实验唯一标识
    dump_scores(params["result_dir"], hash_id, scores, converge, params["data"], params["epoches"], params["lr"], params["gpu"], module_info)
    logging.info("Current hash_id {}".format(hash_id))

if "__main__" == __name__:
    run()

2025-04-25 16:04:37,412 P20988 INFO Using GPU...
2025-04-25 16:04:37,413 P20988 INFO Load from ./chunks\TT
2025-04-25 16:05:38,186 P20988 INFO Model Parameters: hash_id=bfb023f3, data=TT, device=cuda, lr=0.001, epoches=70
2025-04-25 16:06:26,738 P20988 INFO Epoch 1/70, training loss: 0.74033 [48.55s]
2025-04-25 16:07:08,768 P20988 INFO Epoch 2/70, training loss: 0.19566 [42.03s]
2025-04-25 16:07:49,918 P20988 INFO Epoch 3/70, training loss: 0.17215 [41.15s]
2025-04-25 16:08:33,290 P20988 INFO Epoch 4/70, training loss: 0.18001 [43.37s]
2025-04-25 16:09:16,001 P20988 INFO Epoch 5/70, training loss: 0.14360 [42.71s]
2025-04-25 16:09:59,271 P20988 INFO Epoch 6/70, training loss: 0.14718 [43.27s]
2025-04-25 16:10:42,160 P20988 INFO Epoch 7/70, training loss: 0.14127 [42.89s]
2025-04-25 16:11:26,376 P20988 INFO Epoch 8/70, training loss: 0.10565 [44.21s]
2025-04-25 16:12:08,523 P20988 INFO Epoch 9/70, training loss: 0.09727 [42.15s]
2025-04-25 16:13:13,250 P20988 INFO  testing loss: 0.11101