In [None]:
!sudo apt-get install graphviz
!apt-get install -y graphviz libgraphviz-dev pkg-config
!pip install pygraphviz
!pip install torch torchvision torchaudio torch-geometric
!pip install networkx numpy scikit-learn


In [None]:
import os
import pygraphviz as pgv
import networkx as nx
import numpy as np
import torch
import torch.nn.functional as F
from torch_geometric.data import Data, Dataset
from torch_geometric.loader import DataLoader
from torch_geometric.nn import GCNConv, global_mean_pool
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# 1. dot 파일 읽기 및 그래프 생성
def read_dot_file(dot_file_path: str) -> nx.DiGraph:
    A = pgv.AGraph(dot_file_path)
    G = nx.DiGraph()

    for node in A.nodes():
        G.add_node(node, label=A.get_node(node).attr['label'])

    for edge in A.edges():
        G.add_edge(edge[0], edge[1], label=A.get_edge(edge[0], edge[1]).attr['label'])

    return G


# 2. 그래프를 PyTorch Geometric 데이터 형식으로 변환
def convert_graph_to_data(G: nx.DiGraph, label: int) -> Data:
    edge_index = []
    node_labels = []

    for node in G.nodes():
        node_labels.append(G.nodes[node]['label'])

    # 노드 레이블을 레이블 인코딩
    label_encoder = LabelEncoder()
    node_labels_encoded = label_encoder.fit_transform(node_labels)

    for edge in G.edges():
        edge_index.append((list(G.nodes()).index(edge[0]), list(G.nodes()).index(edge[1])))

    edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
    x = torch.tensor(node_labels_encoded, dtype=torch.float).view(-1, 1)  # 노드 특성을 숫자 인코딩으로 설정
    data = Data(x=x, edge_index=edge_index)
    data.y = torch.tensor([label], dtype=torch.long)  # 그래프 레이블 추가
    return data
# 3. GNN 모델 정의
class GNNModel(torch.nn.Module):
    def __init__(self, num_node_features: int):
        super(GNNModel, self).__init__()
        self.conv1 = GCNConv(num_node_features, 16)
        self.conv2 = GCNConv(16, 16)
        self.fc = torch.nn.Linear(16, 2)  # Output for two classes: normal and abnormal

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = global_mean_pool(x, batch)  # Global pooling for graph-level classification
        x = self.fc(x)
        return F.log_softmax(x, dim=1)
# 4. 데이터셋 정의
class GraphDataset(Dataset):
    def __init__(self, normal_file_paths: list, abnormal_file_paths: list):
        super(GraphDataset, self).__init__()
        self.graphs = []
        self.labels = []

        # 정상 그래프 추가
        for fp in normal_file_paths:
            self.graphs.append(convert_graph_to_data(read_dot_file(fp), label=0))
            self.labels.append(0)

        # 비정상 그래프 추가
        for fp in abnormal_file_paths:
            self.graphs.append(convert_graph_to_data(read_dot_file(fp), label=1))
            self.labels.append(1)

    def len(self) -> int:
        return len(self.graphs)

    def get(self, idx: int) -> Data:
        return self.graphs[idx]

# 5. 훈련 과정
def train(model: GNNModel, data_loader: DataLoader, optimizer: torch.optim.Optimizer) -> float:
    model.train()
    correct = 0
    total = 0

    for data in data_loader:
        optimizer.zero_grad()
        out = model(data)
        target = data.y
        loss = F.nll_loss(out, target)
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        pred = out.argmax(dim=1)
        correct += (pred == target).sum().item()
        total += target.size(0)

    return correct / total  # Return accuracy

# 6. 테스트 함수 (F1-score 및 혼동 행렬 추가)
def test(model: GNNModel, data_loader: DataLoader) -> float:
    model.eval()
    correct = 0
    y_true, y_pred = [], []

    with torch.no_grad():
        for data in data_loader:
            out = model(data)
            pred = out.argmax(dim=1)
            y_true.extend(data.y.tolist())
            y_pred.extend(pred.tolist())
            correct += (pred == data.y).sum().item()  # Counting correct predictions

    # F1 Score 계산
    f1 = f1_score(y_true, y_pred, average="binary")  # Binary F1-score
    print(f"F1 Score: {f1:.4f}")

    # 혼동 행렬 계산 및 시각화
    cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Normal", "Abnormal"])
    disp.plot(cmap=plt.cm.Blues)
    plt.title("Confusion Matrix")
    plt.show()

    return correct / len(data_loader.dataset)  # Accuracy

# 7. 메인 함수
def main(normal_dot_files: list, abnormal_dot_files: list) -> None:
    # 정상 및 비정상 데이터셋 통합
    dataset = GraphDataset(normal_dot_files, abnormal_dot_files)
    train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

    model = GNNModel(num_node_features=1)  # 노드 특성 수 설정
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

    accuracies = []  # Accuracy 기록을 위한 리스트

    # 모델 학습
    for epoch in range(200):  # 에포크 수
        accuracy = train(model, train_loader, optimizer)  # 훈련 정확도 계산
        accuracies.append(accuracy)  # 정확도 저장
        print(f'Epoch {epoch + 1} completed - Training Accuracy: {accuracy:.4f}')

    # 전체 데이터로 테스트
    test_loader = DataLoader(dataset, batch_size=32)
    accuracy = test(model, test_loader)
    print(f"Overall Accuracy: {accuracy * 100:.2f}%")

    # 훈련 정확도 그래프 그리기
    plt.figure(figsize=(10, 5))
    plt.plot(range(1, 201), accuracies, marker='o')
    plt.title('Training Accuracy over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.grid()
    plt.xticks(np.arange(1, 201, step=10))
    plt.ylim(0, 1)
    plt.show()

# 8. 실행
normal_dot_files = [f"/content/benign_graph/system_call_graph_{i}.dot" for i in range(1, 291)]  # 정상 그래프 경로
abnormal_dot_files = (
    [f"/content/attack_graph_hello_world/system_call_graph_{i}.dot" for i in range(1, 201)] +
    [f"/content/attack_graph_sqldump/system_call_graph_{i}.dot" for i in range(1, 201)] # 비정상 그래프 경로
)

main(normal_dot_files, abnormal_dot_files)
