In [None]:
# 필요한 모든 라이브러리 임포트
import numpy as np
import pandas as pd
import torch
import seaborn as sns
import matplotlib.pyplot as plt

from torch_geometric.datasets import FacebookPagePage
import torch.nn.functional as F
from torch.nn.functional import binary_cross_entropy_with_logits as BCEWithLogits
from torch.nn import Sequential as Seq, Linear, ReLU
from torch_geometric.nn import MessagePassing, GCNConv
from torch_geometric.utils import train_test_split_edges, negative_sampling
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score, confusion_matrix

In [None]:
dataset = FacebookPagePage(root=".")

data = dataset[0] 
if data.edge_attr is None:
    data.edge_attr = torch.ones(data.edge_index.shape[1], )
# Edge 분할
data = train_test_split_edges(data)

print(data) 

In [None]:
class LinkPredictor(torch.nn.Module):
    def __init__(self, in_channels):
        super(LinkPredictor, self).__init__()
        self.lin = torch.nn.Linear(in_channels * 2, 1)

    def forward(self, x_i, x_j):
        x = torch.cat([x_i, x_j], dim=-1)
        x = self.lin(x)
        return torch.sigmoid(x).squeeze()

In [None]:
class GCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, num_layers, dropout=0.1057085140333304, activation_func=torch.nn.SELU()):
        super(GCN, self).__init__()
        self.layers = torch.nn.ModuleList()
        self.activation_func = activation_func
        self.dropout = torch.nn.Dropout(dropout)

        # 입력 계층
        self.layers.append(GCNConv(in_channels, hidden_channels))

        # 숨겨진 계층
        for _ in range(num_layers - 2):
            self.layers.append(GCNConv(hidden_channels, hidden_channels))
        
        # 출력 계층
        self.layers.append(GCNConv(hidden_channels, out_channels))

    def forward(self, x, edge_index):
        for layer in self.layers[:-1]:
            x = layer(x, edge_index)
            x = self.activation_func(x)
            x = self.dropout(x)
        
        x = self.layers[-1](x, edge_index) # 마지막 계층에서는 활성화 함수와 dropout을 적용하지 않습니다.

        return x

In [None]:
def train():
    model.train()
    link_predictor.train()

    pos_edge_index = data.train_pos_edge_index 
    neg_edge_index = negative_sampling(
        edge_index=pos_edge_index,
        num_nodes=data.num_nodes,
        num_neg_samples=pos_edge_index.size(1),
    )
    neg_edge_index = neg_edge_index 

    optimizer.zero_grad()

    z = model(data.x, pos_edge_index)  
    pos_pred = link_predictor(z[pos_edge_index[0]], z[pos_edge_index[1]])
    neg_pred = link_predictor(z[neg_edge_index[0]], z[neg_edge_index[1]])

    loss_func = torch.nn.BCEWithLogitsLoss()
    pos_loss = loss_func(pos_pred, torch.ones(pos_edge_index.shape[1], device=device))  
    neg_loss = loss_func(neg_pred, torch.zeros(neg_edge_index.shape[1], device=device)) 
    loss = pos_loss + neg_loss
    loss.backward()
    optimizer.step()

    return loss.item()

In [None]:
def validate():
    model.eval()
    link_predictor.eval()

    with torch.no_grad():
        z = model(data.x, data.val_pos_edge_index)
        pos_pred = link_predictor(z[data.val_pos_edge_index[0]], z[data.val_pos_edge_index[1]])
        neg_pred = link_predictor(z[data.val_neg_edge_index[0]], z[data.val_neg_edge_index[1]])

        pos_loss = F.binary_cross_entropy(pos_pred, torch.ones(data.val_pos_edge_index.size(1)))
        neg_loss = F.binary_cross_entropy(neg_pred, torch.zeros(data.val_neg_edge_index.size(1)))
        loss = pos_loss + neg_loss

        labels = torch.cat([torch.ones(data.val_pos_edge_index.size(1)), torch.zeros(data.val_neg_edge_index.size(1))]).cpu().numpy()
        preds = torch.cat([pos_pred, neg_pred]).cpu().numpy()
        preds_class = (preds > 0.5).astype(int)

        precision = precision_score(labels, preds_class)
        recall = recall_score(labels, preds_class)
        f1 = f1_score(labels, preds_class)
        roc_auc = roc_auc_score(labels, preds)

    return loss.item(), precision, recall, f1, roc_auc

In [None]:
# 데이터 준비
out_channels = 8
model = GCN(in_channels=data.num_node_features, hidden_channels=16, out_channels=out_channels, num_layers=3, activation_func=torch.nn.SELU())
link_predictor = LinkPredictor(out_channels)  # out_channels와 동일하게 설정
optimizer = torch.optim.AdamW(list(model.parameters()) + list(link_predictor.parameters()), lr=0.019324891788175074, weight_decay=0.02335770445049197)

for epoch in range(6):
    train_loss = train()
    val_loss, val_precision, val_recall, val_f1, val_roc_auc = validate()
    print(f"Epoch: {epoch}, Training Loss: {train_loss}, Validation Loss: {val_loss}, Precision: {val_precision}, Recall: {val_recall}, F1: {val_f1}, ROC AUC: {val_roc_auc}")

In [None]:
def test():
    model.eval()
    link_predictor.eval()

    with torch.no_grad():
        z = model(data.x, data.test_pos_edge_index)
        pos_pred = link_predictor(z[data.test_pos_edge_index[0]], z[data.test_pos_edge_index[1]])
        neg_pred = link_predictor(z[data.test_neg_edge_index[0]], z[data.test_neg_edge_index[1]])

        pos_loss = F.binary_cross_entropy(pos_pred, torch.ones(data.test_pos_edge_index.size(1)))
        neg_loss = F.binary_cross_entropy(neg_pred, torch.zeros(data.test_neg_edge_index.size(1)))
        loss = pos_loss + neg_loss

        # 실제 positive, negative edge의 라벨
        labels = torch.cat([torch.ones(data.test_pos_edge_index.size(1)), torch.zeros(data.test_neg_edge_index.size(1))]).cpu().numpy()
        
        # 예측된 확률
        preds = torch.cat([pos_pred, neg_pred]).cpu().numpy()

        # 이진 분류 문제에서 확률을 기준으로 클래스를 결정
        preds_class = (preds > 0.65).astype(int)

        precision = precision_score(labels, preds_class)
        recall = recall_score(labels, preds_class)
        f1 = f1_score(labels, preds_class)
        roc_auc = roc_auc_score(labels, preds)
        accuracy = accuracy_score(labels, preds_class)
        confusion_mat = confusion_matrix(labels, preds_class)

    print("Test Loss: {:.4f}".format(loss.item()))
    print("Precision: {:.4f}".format(precision))
    print("Recall: {:.4f}".format(recall))
    print("F1 Score: {:.4f}".format(f1))
    print("ROC AUC Score: {:.4f}".format(roc_auc))
    print("Accuracy: {:.4f}".format(accuracy))
    print("Confusion Matrix:")
    print(confusion_mat)

    return loss.item(), precision, recall, f1, roc_auc, accuracy, confusion_mat, labels, preds


In [None]:
test_loss, precision, recall, f1, roc_auc, accuracy, confusion_mat, labels, preds = test()

In [None]:
# 성능 시각화
# Test Loss, Precision, Recall, F1 Score, ROC AUC Score, Accuracy
test_loss = 2.7366
precision = 0.6741
recall = 0.6320
f1_score = 0.6524
roc_auc = 0.6912
accuracy =  0.6632

# Confusion Matrix
confusion_mat = np.array([[11863, 5219], [6286, 10796]])

# Plotting Precision, Recall, F1 Score, ROC AUC Score, Accuracy
labels = ['Precision', 'Recall', 'F1 Score', 'ROC AUC Score', 'Accuracy']
scores = [precision, recall, f1_score, roc_auc, accuracy]

plt.figure(figsize=(8, 6))
plt.bar(labels, scores, color='skyblue')
plt.ylim(0, 1)  # Set y-axis limit between 0 and 1
plt.title('Evaluation Metrics')
plt.xlabel('Metrics')
plt.ylabel('Scores')
plt.show()

# Plotting Confusion Matrix
plt.figure(figsize=(6, 6))
sns.heatmap(confusion_mat, annot=True, fmt="d", cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

In [None]:
# 하이퍼 파라미터 튜닝
# 랜덤 시드 고정
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

def objective(params):
    # Hyperparameters
    num_layers = int(params['num_layers'])
    hidden_channels = int(params['hidden_channels'])
    dropout = params['dropout']
    lr = params['lr']
    weight_decay = params['weight_decay']

    # 모델 생성
    model = GCN(in_channels=data.num_node_features, hidden_channels=hidden_channels, out_channels=8, num_layers=num_layers, activation_func=F.leaky_relu, dropout=dropout)
    link_predictor = LinkPredictor(8)
    optimizer = optim.AdamW(list(model.parameters()) + list(link_predictor.parameters()), lr=lr, weight_decay=weight_decay)

    # Early stopping
    patience = 5
    best_val_loss = float('inf')
    best_epoch = 0
    
    # Train and validate
    train_losses = []
    val_losses = []
    for epoch in range(20):
        train_loss = train()
        val_loss, val_precision, val_recall, val_f1, val_roc_auc = validate()
        print(f"Epoch: {epoch+1}, Train Loss: {train_loss}, Validation Loss: {val_loss}")
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_epoch = epoch
        elif (epoch - best_epoch) == patience:
            print("Early stopping!")
            break

    return {
        'loss': -val_roc_auc,
        'status': STATUS_OK,
        'eval_time': time.time(),
        'other_stuff': {'train_losses': train_losses, 'val_losses': val_losses}  # Return more information
        }

# Search space
space = {
    'num_layers': hp.quniform('num_layers', 2, 4, 1),
    'hidden_channels': hp.quniform('hidden_channels', 8, 16, 1),
    'dropout': hp.uniform('dropout', 0, 0.5),
    'lr': hp.loguniform('lr', -5, -2),
    'weight_decay': hp.loguniform('weight_decay', -5, -2)
}

# optimization 실행
trials = Trials()
best = fmin(fn=objective,
            space=space,
            algo=tpe.suggest,
            max_evals=100,
            trials=trials)