# Light GCN 논문 리뷰 & 코드 작성
**LightGCN: Simplifying and Powering Graph Convolution Network for Recommendation**  
*He et al. (2020)*  
🔗 [논문 링크](https://arxiv.org/abs/2002.02126)

---

## 핵심 수식

유저와 아이템의 초기 임베딩을 각각 $e_u^{(0)}, e_i^{(0)}$ 라고 할 때, k번째 layer에서 다음과 같이 업데이트 된다.

$$
e_u^{(k+1)} = \sum_{i\in{N_n}}\frac{1}{\sqrt{|N_u||N_i|}}e_i^{(k)}
$$

마지막에는 여러 레이어의 임베딩을 평균 or 가중합 해서 최종 임베딩을 만든다.

In [1]:
import pandas as pd
import numpy as np
import random
import json
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
import pickle
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from scipy.sparse import coo_matrix, csr_matrix
from collections import defaultdict
from torch.utils.tensorboard import SummaryWriter

## 모델 설정 (Hyperparameter)

|항목|설정|
|:---:|:---:|
|임베딩 차원|64|
|학습률|0.001|
|optimizer|Adam|
|weight decay|1e-4(정규화 항으로 BPR에 포함)|
|negative sampling|1:1 비율로 sampling|
|배치  사이즈|1024(Amazon Books : 2048)|
|학습 epoch|max 1000, 일반적으로 200~400|
|레이어 수|3|
|초기화|Xavier uniform|

In [2]:
config = {
    'embedding_dim': 64,
    'num_layers': 3,
    'lr': 0.001,
    'batch_size': 1024,
    'epochs': 200,
    'patience': 10,
    'eval_k': 20,
    'reg_lambda': 1e-4,
    'debug': False,
    'dropout': False,
    'keep_prob': 0.6
}

In [3]:
device = (torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu"))
print("Using device:", device)

Using device: mps


## 데이터 로딩

In [4]:
# train.txt 로드 → user_item_dict
user_item_dict = defaultdict(set)
with open("data/train.txt") as f:
    for user, line in enumerate(f):
        items = list(map(int, line.strip().split()))
        for item in items[1:]:
            user_item_dict[user].add(item)

# test.txt 로드 → test_ground_truth
test_ground_truth = {}

with open("data/test.txt") as f:
    for user, line in enumerate(f):
        parts = list(map(int, line.strip().split()))
        if len(parts) < 2:
            continue  # 유저 ID만 있는 경우는 건너뛰기
        items = parts[1:]  # 유저 ID 뒤에 나오는 모든 아이템
        test_ground_truth[user] = items


# train_interactions 생성 (for model input)
train_user, train_item = [], []
for user, items in user_item_dict.items():
    for item in items:
        train_user.append(user)
        train_item.append(item)

train_interactions = torch.stack([
    torch.tensor(train_user),
    torch.tensor(train_item)
], dim=0)

# 사용자-아이템 딕셔너리로부터 유저/아이템 수 계산
all_items = set()
for items in user_item_dict.values():
    all_items.update(items)
for items in test_ground_truth.values():
    all_items.update(items)
num_items = max(all_items) + 1
num_users = len(user_item_dict)

# 통계 확인
print(f"# Users: {num_users}, # Interactions: {len(train_user)}, # Items: {num_items}")

# Users: 52643, # Interactions: 2380730, # Items: 91599


## LightGCN 모델 정의

In [5]:
class LightGCN(nn.Module):
    # 모델이 사용할 유저/아이템 수, 임베딩 차원, GCN 레이어 수, 엣지 구조를 초기화하는 부분
    def __init__(self, num_users, num_items, embedding_dim, num_layers, user_item_pairs):
        super(LightGCN, self).__init__()
        
        # 하이퍼파라미터를 멤버 변수로 저장
        self.num_users = num_users
        self.num_items = num_items
        self.embedding_dim = embedding_dim
        self.num_layers = num_layers

        # 유저, 아이템 각각에 대한 학습 가능한 임베딩 벡터 정의
        self.user_embedding = nn.Embedding(num_users, embedding_dim)
        self.item_embedding = nn.Embedding(num_items, embedding_dim)

        # 임베딩 가중치 초기화(논문에서 Xavier 초기화 사용)
        nn.init.xavier_uniform_(self.user_embedding.weight)
        nn.init.xavier_uniform_(self.item_embedding.weight)

        # 엣지 구조 저장
        self.norm_adj = self.build_adj_matrix(user_item_pairs)

    # 유저-아이템 상호작용 데이터를 기반으로 정규화된 인접 행렬을 만들기 위함
    def build_adj_matrix(self, user_item_pairs):
        num_nodes = self.num_users + self.num_items # 노드 개수 설정
        
        # bipartite graph 만들기
        rows = user_item_pairs[0].cpu().numpy()
        cols = user_item_pairs[1].cpu().numpy() + self.num_users
        
        R = coo_matrix((np.ones(len(rows)), (rows, cols)), shape=(num_nodes, num_nodes)) # 엣지 가중치를 전부 1.0으로 설정되고 있음
        
        # 양방향으로 연결(symmetric adjacency)
        adj = R + R.T

        # 정규화
        rowsum = np.array(adj.sum(1)).flatten() # degree 계산
        d_inv_sqrt = np.power(rowsum + 1e-8, -0.5) # degree matrix D 계산 -> 역루트 취해서 정규화에 사용
        d_mat_inv_sqrt = coo_matrix((d_inv_sqrt, (np.arange(num_nodes), np.arange(num_nodes))), shape=(num_nodes, num_nodes))

        norm_adj = d_mat_inv_sqrt @ adj @ d_mat_inv_sqrt # 정규화된 sparse adjacency matrix
        return self._convert_sp_mat_to_sp_tensor(norm_adj)

    # scipy.sparse.coo_matrix -> torch.sparse.FloatTensor로 바꾸는 역할
    def _convert_sp_mat_to_sp_tensor(self, mat):
        mat = mat.tocoo().astype(np.float32) # coo 형식으로 변환하고 타입을 float32로 변환
        indices = torch.from_numpy(np.vstack((mat.row, mat.col))).long() # (row, col) 인덱스를 Pytorch tensor로 변환
        values = torch.from_numpy(mat.data) # 각 엣지의 값(가중치)을 tensor로 변환
        shape = torch.Size(mat.shape) # 전체 sparse tensor의 shape 결정
        return torch.sparse_coo_tensor(indices, values, shape).to(self.user_embedding.weight.device) # sparse tensor로 생성하고 모델이 사용하는 디바이스로 이동

    # 유저/아이템 초기 임베딩을 그래프 전파를 통해 업데이트하고 평균 임베딩 변환
    def getEmbedding(self):
        all_emb = torch.cat([self.user_embedding.weight, self.item_embedding.weight]) # 유저/아이템 초기 임베딩 연결 (Layer0)
        embs = [all_emb] # 각 레이어별 임베딩 리스트 초기화
        adj = self.norm_adj
        
        # LightGCN message passing : 각 레이어마다 임베딩 업데이트
        for _ in range(self.num_layers):
            all_emb = torch.sparse.mm(adj, all_emb)
            embs.append(all_emb)
        
        embs = torch.stack(embs, dim=1) # 모든 레이어의 임베딩을 쌓고,
        light_out = torch.mean(embs, dim=1) # 레이어별 임베딩 평균
        user_emb, item_emb = light_out[:self.num_users], light_out[self.num_users:] # 유저/아이템 임베딩 분리해서 반환
        return user_emb, item_emb

    # 모든 아이템에 대한 평점 예측 점수 반환 
    def getUsersRating(self, users):
        user_emb, item_emb = self.getEmbedding() # GCN 기반으로 유저/아이템 임베딩 추출
        users_emb = user_emb[users] # 각 유저/positive/negative 아이템의 GCN 임베딩 추출
        scores = torch.matmul(users_emb, item_emb.t()) # 유저 임베딩과 전체 아이템 임베딩 간 내적하여 평점 예측 점수 추출
        return torch.sigmoid(scores) # Sigmoid를 통해 점수를 0~1 범위로 정규화

    # 유저, positive item, negative item 임베딩을 각각 추출 + 초기 임베딩도 반환
    def getEmbeddingTriple(self, users, pos_items, neg_items):
        user_emb, item_emb = self.getEmbedding()
        users_emb = user_emb[users]
        pos_emb = item_emb[pos_items]
        neg_emb = item_emb[neg_items]
        
        users_emb_0 = self.user_embedding(users)
        pos_emb_0 = self.item_embedding(pos_items)
        neg_emb_0 = self.item_embedding(neg_items)
        
        return users_emb, pos_emb, neg_emb, users_emb_0, pos_emb_0, neg_emb_0

    # GCN 레이어를 반복하면서 유저/아이템의 임베딩을 업데이트하고 마지막에 평균을 내서 최종 임베딩을 출력하는 것
    def forward(self):
        # 초기 임베딩 가져오기 (Layer 0)
        emb = torch.cat([self.user_embedding.weight, self.item_embedding.weight], dim=0)
        all_embs = [emb]
        
        adj = self.norm_adj
        
        # 메시지 전달 (Layer 수 만큼 반복)
        for _ in range(self.num_layers):
            emb = torch.sparse.mm(adj, emb)
            all_embs.append(emb)
        
        # 레이어별 임베딩 평균
        final_emb = torch.stack(all_embs, dim=0).mean(0)
        
        # 유저/아이템 임베딩 분리
        user_emb, item_emb = final_emb[:self.num_users], final_emb[self.num_users:]
        return user_emb, item_emb

## BPR Loss

$$L_{BPR}=-\sum_{(u,i,j)}log\;\sigma(\hat y_{ui}-\hat y_{uj})$$

In [6]:
def bpr_loss(model, users, pos_items, neg_items):
    # GCN 결과 + 초기 임베딩 모두 가져오기
    users_emb, pos_emb, neg_emb, user_emb_0, pos_emb_0, neg_emb_0 = model.getEmbeddingTriple(users.long(), pos_items.long(), neg_items.long())

    # BPR loss 계산
    pos_scores = torch.sum(users_emb * pos_emb, dim=1)
    neg_scores = torch.sum(users_emb * neg_emb, dim=1)
    loss = torch.mean(F.softplus(neg_scores - pos_scores))

    # 초기 임베딩에만 정규화 적용 (논문과 동일)
    reg_loss = config['reg_lambda'] * (user_emb_0.norm(2).pow(2) + pos_emb_0.norm(2).pow(2) + neg_emb_0.norm(2).pow(2)) / users.shape[0]

    return loss + reg_loss

## Early Stopping

In [7]:
class EarlyStopping:
    def __init__(self, patience=config['patience'], delta=0.0, verbose=True):
        self.patience = patience
        self.delta = delta
        self.verbose = verbose
        self.best_score = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, current_score):
        if self.best_score is None or current_score > self.best_score + self.delta:
            self.best_score = current_score
            self.counter = 0
        else:
            self.counter += 1

## 평가 지표 & 함수

해당 논문에서는 Recall@20, Precision@20, NDCG@20 으로 진행하였다.

In [8]:
# Recall@K : 정답 아이템 중 상위 K개에 포함된 비율
def recall_at_k(ranked_list, ground_truth, k):
    return len(set(ranked_list[:k]) & set(ground_truth)) / len(set(ground_truth))

# Precision@K : 상위 K개 중 정답 아이템의 비율
def precision_at_k(ranked_list, ground_truth, k):
    return len(set(ranked_list[:k]) & set(ground_truth)) / k

# NDCG@K : 정답 아이템의 순위를 고려한 정밀도 지표
def ndcg_at_k(ranked_list, ground_truth, k):
    dcg = 0.0
    for i, item in enumerate(ranked_list[:k]):
        if item in ground_truth:
            dcg += 1 / np.log2(i + 2)
    idcg = sum(1 / np.log2(i + 2) for i in range(min(len(ground_truth), k)))
    return dcg / idcg if idcg > 0 else 0.0

# 전체 유저에 대해 모델 성능 평가 (배치 단위)
def evaluate_model(model, test_ground_truth, user_item_dict, k=20, batch_size=1024, silent=False, desc="Evaluating"):
    device = next(model.parameters()).device
    model.eval()
    with torch.no_grad():
        user_emb, item_emb = model()
        user_emb = user_emb.to(device)
        item_emb = item_emb.to(device)

    recall_list, precision_list, ndcg_list = [], [], []
    test_users = list(test_ground_truth.keys())

    for i in tqdm(range(0, len(test_users), batch_size), desc=desc):
        batch_users = test_users[i:i+batch_size]
        batch_user_emb = user_emb[batch_users]

        for idx, user in enumerate(batch_users):
            gt_items = test_ground_truth[user]
            train_items = user_item_dict.get(user, set())
            candidates = list((set(range(model.num_items)) - train_items) | set(gt_items))

            scores = torch.matmul(batch_user_emb[idx], item_emb[candidates].T)
            ranked_items = [candidates[i] for i in torch.topk(scores, k).indices.tolist()]

            recall_list.append(recall_at_k(ranked_items, gt_items, k))
            precision_list.append(precision_at_k(ranked_items, gt_items, k))
            ndcg_list.append(ndcg_at_k(ranked_items, gt_items, k))

    if not silent:
        print(f"Recall@{k}: {np.mean(recall_list):.4f}, Precision@{k}: {np.mean(precision_list):.4f}, NDCG@{k}: {np.mean(ndcg_list):.4f}")

    return np.mean(recall_list), np.mean(precision_list), np.mean(ndcg_list)

## 학습 루프

In [9]:
# 여러 개의 배열을 동일한 순서로 무작위 셔플
def shuffle(*arrays):
    if len(set(len(x) for x in arrays)) != 1:
        raise ValueError("All arrays must have the same length")
    idx = np.random.permutation(len(arrays[0]))
    return tuple(x[idx] for x in arrays)

In [10]:
# 유저별 (user, pos, neg) 트리플 샘플 생성
def sample_user_triplets(user_item_dict, all_items, users):
    samples = []
    for u in users:
        if not user_item_dict[u]:
            continue
        pos = random.choice(list(user_item_dict[u]))
        neg_pool = list(all_items - user_item_dict[u])
        if not neg_pool:
            continue
        neg = random.choice(neg_pool)
        samples.append((u, pos, neg))
    return samples

In [11]:
# 배치 단위로 모델을 학습하고 평균 loss 반환
def train_one_epoch(model, optimizer, user_item_dict, all_items, batch_size, device):
    model.train()
    users = list(user_item_dict.keys())
    random.shuffle(users)
    samples = sample_user_triplets(user_item_dict, all_items, users)

    users_tensor = torch.tensor([s[0] for s in samples], device=device)
    pos_tensor = torch.tensor([s[1] for s in samples], device=device)
    neg_tensor = torch.tensor([s[2] for s in samples], device=device)

    users_tensor, pos_tensor, neg_tensor = shuffle(users_tensor, pos_tensor, neg_tensor)

    total_loss = 0
    num_batches = len(users_tensor) // batch_size + 1

    for start in range(0, len(users_tensor), batch_size):
        end = start + batch_size
        batch_users = users_tensor[start:end]
        batch_pos = pos_tensor[start:end]
        batch_neg = neg_tensor[start:end]

        loss = bpr_loss(model, batch_users, batch_pos, batch_neg)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / num_batches

In [12]:
# 전체 학습 과정을 실행하고, TensorBoard 로그 및 EarlyStopping 포함
def train_lightgcn(model, interactions, optimizer, test_ground_truth, user_item_dict,
                   epochs=config['epochs'], batch_size=config['batch_size'], k=config['eval_k'], patience=config['patience']):
    writer = SummaryWriter()
    topks = [k]

    # interaction을 기반으로 유저-아이템 딕셔너리 구성
    user_item_dict.clear()
    for u, i in zip(interactions[0], interactions[1]):
        user_item_dict[u.item()].add(i.item())

    all_items = set(range(model.num_items))
    early_stopping = EarlyStopping(patience=patience)
    device = next(model.parameters()).device

    for epoch in range(epochs):
        print(f"[Epoch {epoch+1}] Training start...")
        avg_loss = train_one_epoch(model, optimizer, user_item_dict, all_items, batch_size, device)
        print(f"Epoch {epoch + 1}: Loss = {avg_loss:.4f}")
        writer.add_scalar("Loss/train", avg_loss, epoch)

        # 평가 및 로깅
        recall_all = {}
        for topk in topks:
            recall, prec, ndcg = evaluate_model(model, test_ground_truth, user_item_dict, topk, silent=True)
            writer.add_scalars(f"Metrics@{topk}", {"Recall": recall, "Precision": prec, "NDCG": ndcg}, epoch)
            recall_all[topk] = recall

        # EarlyStopping 체크
        recall = recall_all[k]
        early_stopping(recall)
        if early_stopping.early_stop:
            print(f"Early stopping at epoch {epoch + 1}")
            break

## 모델 학습 및 평가 실행

In [13]:
# 모델 초기화
model = LightGCN(num_users, num_items, embedding_dim=config['embedding_dim'], num_layers=config['num_layers'], user_item_pairs=train_interactions)


# Optimizer 설정
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])

# 모델 학습
train_lightgcn(
    model,
    train_interactions,
    optimizer,
    test_ground_truth,
    user_item_dict,
    epochs=config['epochs'],
    batch_size=config['batch_size'],
    k=config['eval_k'],
    patience=config['patience']
)

# 평가 성능 (K=20)
evaluate_model(model, test_ground_truth, user_item_dict, k=config['eval_k'])

[Epoch 1] Training start...
Epoch 1: Loss = 0.6922


Evaluating: 100%|██████████| 52/52 [09:12<00:00, 10.62s/it]


[Epoch 2] Training start...
Epoch 2: Loss = 0.6698


Evaluating: 100%|██████████| 52/52 [25:08<00:00, 29.00s/it] 


[Epoch 3] Training start...
Epoch 3: Loss = 0.6134


Evaluating: 100%|██████████| 52/52 [32:12<00:00, 37.15s/it]  


[Epoch 4] Training start...
Epoch 4: Loss = 0.5537


Evaluating: 100%|██████████| 52/52 [09:18<00:00, 10.74s/it]


[Epoch 5] Training start...
Epoch 5: Loss = 0.5061


Evaluating: 100%|██████████| 52/52 [09:36<00:00, 11.09s/it]


[Epoch 6] Training start...


KeyboardInterrupt: 

Recall@20: 0.0687, Precision@20: 0.0034, NDCG@20: 0.0260