In [None]:
pip install matplotlib pandas numpy scikit-learn torch_geometric

Note: you may need to restart the kernel to use updated packages.


In [None]:
#pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124

In [None]:
import os
import torch
import pandas as pd
from torch_geometric.data import Data, Dataset

# 그래프 데이터를 로드하는 함수
def get_graph_data(edges_file, nodes_file):
    """
    edges.txt와 nodes.csv 파일에서 그래프 데이터를 로드합니다.

    Args:
        edges_file (str): 엣지 데이터 파일 경로.
        nodes_file (str): 노드 데이터 파일 경로.

    Returns:
        x (Tensor): 노드 특징 텐서.
        edge_index (Tensor): 엣지 인덱스 텐서.
    """
    # 노드와 엣지 데이터 로드
    nodes = pd.read_csv(nodes_file)
    edges = pd.read_csv(edges_file, sep=r'\s+', header=None)

    # 'id' 컬럼이 존재하는지 확인
    if 'id' not in nodes.columns:
        raise ValueError("nodes.csv 파일에 'id' 컬럼이 없습니다.")

    # 노드 ID를 인덱스로 매핑
    node_id_to_idx = {node_id: idx for idx, node_id in enumerate(nodes['id'])}
    edges[0] = edges[0].map(node_id_to_idx)
    edges[1] = edges[1].map(node_id_to_idx)

    # 매핑 중 잘못된 데이터가 없는지 확인
    if edges.isnull().values.any():
        raise ValueError("엣지 데이터에 잘못된 노드 ID가 포함되어 있습니다.")

    # 노드 데이터를 PyTorch 텐서로 변환
    x = torch.tensor(nodes['id'].values, dtype=torch.float).unsqueeze(1)  # 차원 추가

    # 엣지 데이터를 PyTorch 텐서로 변환
    edge_index = torch.tensor(edges.values.T, dtype=torch.long)

    return x, edge_index


# 그래프 데이터셋 클래스 정의
class GraphDataset(Dataset):
    """
    그래프 데이터를 PyTorch Geometric의 Dataset 형식으로 정의합니다.
    """
    def __init__(self, root, transform=None, pre_transform=None):
        """
        데이터셋 초기화.

        Args:
            root (str): 데이터셋 루트 디렉토리 경로.
            transform (callable, optional): 데이터 변환 함수.
            pre_transform (callable, optional): 데이터 전처리 함수.
        """
        super().__init__(root, transform, pre_transform)
        self.graph_files = []  # 그래프 파일 경로 저장 리스트
        self.labels = []  # 그래프 라벨 저장 리스트

        # 루트 디렉토리의 각 폴더를 탐색하여 그래프와 라벨 수집
        for label_dir in os.listdir(root):
            if "Non_Conspiracy" in label_dir:
                label = 0  # Non-Conspiracy 폴더의 라벨
            elif "Conspiracy" in label_dir:
                label = 1  # Conspiracy 폴더의 라벨
            else:
                raise ValueError(f"알 수 없는 라벨 디렉토리: {label_dir}")

            # 각 서브 디렉토리를 탐색하여 그래프 데이터 수집
            subdir = os.path.join(root, label_dir)
            for graph_index in os.listdir(subdir):
                graph_path = os.path.join(subdir, graph_index)
                if os.path.isdir(graph_path):  # 디렉토리인지 확인
                    self.graph_files.append(graph_path)  # 그래프 경로 저장
                    self.labels.append(label)  # 라벨 저장

        # 라벨을 PyTorch 텐서로 변환
        self.labels = torch.tensor(self.labels, dtype=torch.long)

    def len(self):
        """
        데이터셋의 크기를 반환합니다.
        """
        return len(self.graph_files)

    def get(self, idx):
        """
        데이터셋의 특정 인덱스에 해당하는 그래프 데이터를 반환합니다.

        Args:
            idx (int): 그래프 인덱스.

        Returns:
            Data: PyTorch Geometric의 Data 객체.
        """
        graph_path = self.graph_files[idx]
        label = self.labels[idx]

        # 엣지 및 노드 파일 경로 설정
        edges_file = os.path.join(graph_path, "edges.txt")
        nodes_file = os.path.join(graph_path, "nodes.csv")

        # 엣지와 노드 파일이 비어 있는 경우 처리
        if not os.path.exists(edges_file) or os.stat(edges_file).st_size == 0:
            edges = pd.DataFrame(columns=[0, 1])  # 빈 엣지 데이터 생성
        else:
            edges = pd.read_csv(edges_file, sep=r'\s+', header=None)

        if not os.path.exists(nodes_file) or os.stat(nodes_file).st_size == 0:
            nodes = pd.DataFrame({'id': [0]})  # 기본 노드 생성
        else:
            nodes = pd.read_csv(nodes_file)

        # 'id' 컬럼이 없는 경우 기본 값 추가
        if 'id' not in nodes.columns:
            nodes['id'] = range(len(nodes))

        # 노드 ID를 인덱스로 매핑
        node_id_to_idx = {node_id: idx for idx, node_id in enumerate(nodes['id'])}
        edges[0] = edges[0].map(node_id_to_idx)
        edges[1] = edges[1].map(node_id_to_idx)

        # 잘못된 엣지 데이터 제거
        edges = edges.dropna().astype(int)

        # PyTorch 텐서로 변환
        x = torch.tensor(nodes['id'].values, dtype=torch.float).unsqueeze(1)
        edge_index = torch.tensor(edges.values.T, dtype=torch.long)

        # PyTorch Geometric의 Data 객체 반환
        return Data(x=x, edge_index=edge_index, y=label)


# 데이터셋 경로 설정
data_root = "dataset"

# 데이터셋 생성
dataset = GraphDataset(root=data_root)

# 데이터셋 크기와 라벨 출력
print(f"Total graphs: {len(dataset)}")  # 전체 그래프 수
print(f"Labels: {dataset.labels.unique().tolist()}")  # 라벨 분포


In [None]:
import torch
from sklearn.model_selection import train_test_split
from torch_geometric.data import DataLoader
import numpy as np

# 데이터셋을 클래스 비율에 맞게 분할
def split_dataset_by_class(dataset, test_size=0.2, val_size=0.1, random_state=42):
    # 클래스별 인덱스 수집
    class_indices = {label: [] for label in np.unique([data.y.item() for data in dataset])}
    for idx, data in enumerate(dataset):
        class_indices[data.y.item()].append(idx)

    # 각 클래스별로 학습, 검증, 테스트 인덱스 분할
    train_indices, val_indices, test_indices = [], [], []
    for label, indices in class_indices.items():
        train_idx, test_idx = train_test_split(indices, test_size=test_size, random_state=random_state)
        train_idx, val_idx = train_test_split(train_idx, test_size=val_size / (1 - test_size), random_state=random_state)
        train_indices.extend(train_idx)
        val_indices.extend(val_idx)
        test_indices.extend(test_idx)

    return train_indices, val_indices, test_indices

# 시드 고정 (재현 가능한 결과를 위해 사용)
random_state = 42
torch.manual_seed(random_state)
np.random.seed(random_state)

# 데이터셋 분할
train_idx, val_idx, test_idx = split_dataset_by_class(dataset, test_size=0.2, val_size=0.1, random_state=random_state)

# 분할된 인덱스로 데이터셋 생성
train_dataset = dataset[train_idx]
val_dataset = dataset[val_idx]
test_dataset = dataset[test_idx]

# 데이터로더 생성
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)  # 학습용 데이터 섞음
val_loader = DataLoader(val_dataset, batch_size=32)                   # 검증 데이터 고정
test_loader = DataLoader(test_dataset, batch_size=32)                 # 테스트 데이터 고정

# 데이터셋 크기 출력
print(f"Train size: {len(train_dataset)}, Validation size: {len(val_dataset)}, Test size: {len(test_dataset)}")


In [None]:
import torch
import random
import numpy as np
from sklearn.metrics import f1_score, roc_auc_score
from torch_geometric.nn import GCNConv, global_mean_pool
from torch.optim.lr_scheduler import StepLR
import torch.nn as nn


# 시드 고정 함수
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

# 시드 고정
SEED = 42
set_seed(SEED)
print(f"Seed set to: {SEED}")

# CUDA 확인 및 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# GCN 분류기 정의
class GCNClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, 
                 num_layers=2, dropout_rate=0.5, use_batch_norm=True, use_residual=False):
        super().__init__()
        self.num_layers = num_layers
        self.use_batch_norm = use_batch_norm
        self.use_residual = use_residual
        self.dropout_rate = dropout_rate

        # GCN 레이어 정의
        self.convs = nn.ModuleList()
        self.convs.append(GCNConv(input_dim, hidden_dim))
        for _ in range(num_layers - 1):
            self.convs.append(GCNConv(hidden_dim, hidden_dim))

        # 배치 정규화 레이어 정의
        if use_batch_norm:
            self.bns = nn.ModuleList([nn.BatchNorm1d(hidden_dim) for _ in range(num_layers)])

        # 드롭아웃 정의
        self.dropout = nn.Dropout(dropout_rate)

        # 완전 연결 레이어
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        residual = None

        for i, conv in enumerate(self.convs):
            x = conv(x, edge_index).relu()
            if self.use_batch_norm:
                x = self.bns[i](x)
            if self.use_residual and residual is not None:
                x += residual
            x = self.dropout(x)
            residual = x

        x = global_mean_pool(x, batch)
        x = self.fc(x)
        return x

# 모델, 옵티마이저 및 손실 함수 초기화


# 클래스 분포 확인 및 클래스 가중치 설정
labels = [data.y.item() for data in dataset]
class_counts = torch.tensor([labels.count(0), labels.count(1), labels.count(2)], dtype=torch.float)  # 클래스 3개
class_weights = 1.0 / class_counts  # 클래스 가중치 계산
criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))  # 가중치 적용된 손실 함수

# GCN 분류기 초기화
input_dim = dataset[0].x.shape[1]  # 노드 특징 차원
model = GCNClassifier(input_dim=input_dim, hidden_dim=128, output_dim=2).to(device)  # 출력 차원 3으로 설정
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = StepLR(optimizer, step_size=20, gamma=0.5)
# 얼리 스토핑 설정
patience = 10
best_val_loss = float('inf')
patience_counter = 0

# Training Loop: 학습 루프
def train():
    model.train()  # 모델을 학습 모드로 설정
    total_loss = 0  # 총 손실 초기화
    all_preds = []  # 전체 예측값 저장 리스트
    all_labels = []  # 전체 실제 라벨 저장 리스트

    for data in train_loader:  # 학습 데이터 로더에서 배치 단위로 데이터 가져오기
        data = data.to(device)  # 데이터를 GPU 또는 CPU로 이동
        optimizer.zero_grad()  # 이전 배치의 그래디언트 초기화
        out = model(data)  # 모델에 데이터를 전달하여 출력값 계산
        loss = criterion(out, data.y)  # 출력값과 실제 라벨을 비교하여 손실 계산
        loss.backward()  # 손실에 대한 그래디언트 계산
        optimizer.step()  # 옵티마이저를 사용하여 모델 파라미터 업데이트
        total_loss += loss.item()  # 배치 손실 값을 총 손실에 더함

        # 예측값과 실제 라벨 저장
        pred = out.argmax(dim=1)  # 예측값(가장 높은 확률의 클래스 선택)
        all_preds.extend(pred.cpu().numpy())  # 예측값을 CPU로 이동 후 저장
        all_labels.extend(data.y.cpu().numpy())  # 실제 라벨을 CPU로 이동 후 저장
    
    # 정확도와 F1-score 계산
    accuracy = (torch.tensor(all_preds) == torch.tensor(all_labels)).sum().item() / len(all_labels)
    f1 = f1_score(all_labels, all_preds, average="weighted")  # 가중치가 있는 F1-score 계산
    return total_loss / len(train_loader), accuracy, f1  # 평균 손실, 정확도, F1-score 반환
# Validation/Test Loop: 검증 및 테스트 루프
def validate(loader):
    model.eval()  # 평가 모드로 설정
    total_loss = 0
    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for data in loader:
            data = data.to(device)  # 데이터를 GPU/CPU로 이동
            out = model(data)  # 모델 출력 계산
            loss = criterion(out, data.y)  # 손실 계산
            total_loss += loss.item()

            # 예측값과 확률 저장
            pred = out.argmax(dim=1)
            prob = out.softmax(dim=1)  # 모든 클래스의 확률 계산
            all_preds.extend(pred.cpu().numpy())
            all_labels.extend(data.y.cpu().numpy())
            all_probs.extend(prob.cpu().numpy())

    # 정확도와 F1-score 계산
    accuracy = (torch.tensor(all_preds) == torch.tensor(all_labels)).sum().item() / len(all_labels)
    f1 = f1_score(all_labels, all_preds, average="weighted")  # F1-score 계산

    # AUC 계산 (다중 클래스)
    auc = roc_auc_score(all_labels, all_probs, multi_class="ovr")  # One-vs-Rest 방식으로 AUC 계산
    return total_loss / len(loader), accuracy, f1, auc

# Main Training Script 유지
for epoch in range(50):  # 최대 50 에포크 동안 학습
    train_loss, train_acc, train_f1 = train()  # 학습
    val_loss, val_acc, val_f1, val_auc = validate(val_loader)  # 검증
    scheduler.step()  # 학습률 감소

    # 얼리 스토핑 로직
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), "best_model.pth")  # 최적 모델 저장
        print(f"Epoch {epoch + 1:03d} | Saving best model")
    else:
        patience_counter += 1

    if patience_counter >= patience:
        print("Early stopping triggered")
        break

    # 에포크 결과 출력
    print(f"Epoch {epoch + 1:03d} | Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Train F1: {train_f1:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}, Val F1: {val_f1:.4f}, Val AUC: {val_auc:.4f}")

# 테스트 데이터셋 평가
model.load_state_dict(torch.load("best_model.pth"))  # 최적 모델 로드
test_loss, test_acc, test_f1, test_auc = validate(test_loader)  # 테스트 데이터셋 검증
print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}, Test F1: {test_f1:.4f}, Test AUC: {test_auc:.4f}")