# KGCN 논문 리뷰 & 코드 작성
**KGCN: Simplifying and Powering Graph Convolution Network for Recommendation**  
*Hongwei Wang et al. (2019)*  
🔗 [논문 링크](https://arxiv.org/abs/1904.12575)

## 3.1 Problem Formulation

*목표 : 사용자 u가 아이템 v에 관심 있을지를 예측하는 함수*
$$\hat{y}_{uv} = F(u,v | Θ, Y, G)$$
- Y : 사용자-아이템 상호작용 행렬 (예: 클릭, 평가)
- G : 지식 그래프 (KG), 삼중항(triple : head, relation, tail)의 집합

## 3.2 KGCN Layer (모델 구성 요소)

*1. 관계 중요도 계산 (사용자 u, 관계 r)*
$$\pi_{ur} = g(u,r)$$

*2. 이웃 노드 집계 (attention 가중치 포함)*
$$v_u^{N(v)} = \sigma_{e\in{N(v)}}\tilde{\pi}_{ur}⋅e$$
$$\tilde{\pi}_{ur}=\frac{exp(\pi_{ur})}{\sum\nolimits_{e'}exp(\pi_{ur'})}$$

*3. Aggregation 방식 3가지*
- Sum : $ReLU(W(v+v_u^{N(v)})+b)$
- Concat : $ReLU(W[v;v_u^{N(v)}]+b)$
- Neighbor : $ReLU(Wv_u^{N(v)}+b)$

## 3.3 Learning Algorithm (학습 알고리즘)

*반복 구조*
KGCN은 여러 계층(hop)으로 구성되어 '0-hop → 1-hop → ...'와 같은 형식으로 이웃 정보를 반복적으로 전파 및 집계
$$\hat{y}_{uv}=f(u, v_u^{(h)})$$

*학습 손실 함수*
- Cross Entropy + Negative Sampling + L2 정규화 포함
$$L = \sum_{u}\begin{bmatrix}\sum_{v:y_{uv}=1}J(y_{uv}, \hat{y}_{uv})-\sum_{i=1}^{T_u}\mathbb{E}_{vi~P(v)}J(0,\hat{y}_(uvi))\end{bmatrix}+\lambda||F||_2^2$$

In [11]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
import argparse
import matplotlib.pyplot as plt

In [12]:
device = (torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu"))
print("Using device:", device)

Using device: mps


## Book - Crossing 모델 설정 (Hyperparameter)

|항목|설정|
|:---:|:---:|
임베딩 차원 | 64
학습률 | 0.0002
Optimizer | Adam
정규화 계수 (L2) | 2e-5
Negative Sampling | 1:1 비율로 샘플링
배치 사이즈 | 256
학습 Epoch | 최대 1000 (일반적으로 200~400)
레이어 수 (Receptive Field Depth, H) | 1
이웃 샘플링 수 (K) | 8
초기화 방식 | Xavier Uniform (명시는 없지만 일반적인 초기화 방식으로 추정됨)

In [13]:
"""
config = {
    'device': 'mps',
    'dataset': 'book',
    'embedding_dim': 64,
    'n_layers': 1,
    'lr': 0.0002,
    'batch_size': 256,
    'l2': 2e-5,
    'n_epoch': 200,
    'n_neighbor': 8,
    'H': 1,
    'aggregator': 'sum'
}
"""

In [None]:
config = {
    'item2id_path': 'data/item_index2entity_id.txt',
    'kg_path': 'data/kg.txt',
    'rating_path': 'data/user_artists.dat',
    'rating_sep': '\t',
    'threshold': 0.0
}

In [None]:
class KGDataLoader:
    def __init__(self, cfg):
        self.cfg = cfg  # 설정 저장

        # 파일 로드
        df_item2id = pd.read_csv(self.cfg['item2id_path'], sep='\t', header=None, names=['item', 'id'])  # 아이템→엔티티 매핑
        df_kg = pd.read_csv(self.cfg['kg_path'], sep='\t', header=None, names=['head', 'relation', 'tail'])  # 지식 그래프
        df_rating = pd.read_csv(self.cfg['rating_path'], sep=self.cfg['rating_sep'], 
                                names=['userID', 'itemID', 'rating'], skiprows=1)  # 사용자-아이템 평점

        # 매핑에 존재하는 아이템만 필터링
        df_rating = df_rating[df_rating['itemID'].isin(df_item2id['item'])]
        df_rating.reset_index(inplace=True, drop=True)

        # 데이터프레임 저장
        self.df_item2id = df_item2id
        self.df_kg = df_kg
        self.df_rating = df_rating

        # 인코더 준비
        self.user_encoder = LabelEncoder()
        self.entity_encoder = LabelEncoder()
        self.relation_encoder = LabelEncoder()

        # ID 인코딩 실행
        self._encoding()

    def _encoding(self): # userID, entityID, relation을 정수로 인코딩
        self.user_encoder.fit(self.df_rating['userID'])
        self.entity_encoder.fit(pd.concat([self.df_item2id['id'], self.df_kg['head'], self.df_kg['tail']]))
        self.relation_encoder.fit(self.df_kg['relation'])

        # 변환 적용
        self.df_kg['head'] = self.entity_encoder.transform(self.df_kg['head'])
        self.df_kg['tail'] = self.entity_encoder.transform(self.df_kg['tail'])
        self.df_kg['relation'] = self.relation_encoder.transform(self.df_kg['relation'])

    def _build_dataset(self): # positive + negative 샘플 생성하여 학습용 데이터셋 구성
        print('Build dataset dataframe ...', end=' ')
        df_dataset = pd.DataFrame()

        # 사용자 ID 인코딩
        df_dataset['userID'] = self.user_encoder.transform(self.df_rating['userID'])

        # 아이템 문자열을 entity ID로 매핑
        item2id_dict = dict(zip(self.df_item2id['item'], self.df_item2id['id']))
        self.df_rating['itemID'] = self.df_rating['itemID'].apply(lambda x: item2id_dict[x])

        # 엔티티 인코딩
        df_dataset['itemID'] = self.entity_encoder.transform(self.df_rating['itemID'])

        # 평점을 임계값 기준으로 이진 라벨 생성
        df_dataset['label'] = self.df_rating['rating'].apply(lambda x: 0 if x < self.cfg['threshold'] else 1)

        # positive만 사용
        df_dataset = df_dataset[df_dataset['label'] == 1]

        # 전체 엔티티 집합에서 negative 샘플링
        full_item_set = set(range(len(self.entity_encoder.classes_)))
        user_list, item_list, label_list = [], [], []
        for user, group in df_dataset.groupby('userID'):
            item_set = set(group['itemID'])  # positive 아이템 집합
            negative_set = full_item_set - item_set  # negative 후보
            negative_sampled = random.sample(list(negative_set), len(item_set))  # 같은 수 만큼 샘플링

            # negative 샘플 저장
            user_list.extend([user] * len(negative_sampled))
            item_list.extend(negative_sampled)
            label_list.extend([0] * len(negative_sampled))

        # negative를 데이터프레임으로 만들고 결합
        negative = pd.DataFrame({'userID': user_list, 'itemID': item_list, 'label': label_list})
        df_dataset = pd.concat([df_dataset, negative])

        # 셔플 및 인덱스 초기화
        df_dataset = df_dataset.sample(frac=1, replace=False, random_state=999)
        df_dataset.reset_index(inplace=True, drop=True)

        print('Done')
        return df_dataset

    def _construct_kg(self): # 양방향 knowledge graph 딕셔너리 구성
        print('Construct knowledge graph ...', end=' ')
        kg = dict()
        for i in range(len(self.df_kg)):
            head = self.df_kg.iloc[i]['head']
            relation = self.df_kg.iloc[i]['relation']
            tail = self.df_kg.iloc[i]['tail']

            # head → tail
            if head in kg:
                kg[head].append((relation, tail))
            else:
                kg[head] = [(relation, tail)]

            # tail → head (양방향)
            if tail in kg:
                kg[tail].append((relation, head))
            else:
                kg[tail] = [(relation, head)]

        print('Done')
        return kg

    def load_dataset(self): # 학습용 사용자-아이템 데이터셋 로드
        return self._build_dataset()

    def load_kg(self): # 지식 그래프 딕셔너리 로드
        return self._construct_kg()

    def get_encoders(self): # user, entity, relation 인코더 반환
        return (self.user_encoder, self.entity_encoder, self.relation_encoder)

    def get_num(self): # 각 인코더의 클래스 수 반환 (모델 입력 차원 계산용)
        return (len(self.user_encoder.classes_), 
                len(self.entity_encoder.classes_), 
                len(self.relation_encoder.classes_))

In [None]:
# 설정 정보를 담은 딕셔너리를 사용해 KGDataLoader 인스턴스를 생성
data_loader = KGDataLoader(cfg=config)

# 학습용 사용자-아이템 데이터셋을 생성 (positive + negative 샘플 포함)
df_dataset = data_loader.load_dataset()

# 레이블(positive: 1, negative: 0) 분포 출력
print("레이블 분포:")
print(df_dataset['label'].value_counts())

# 전체 데이터 중 레이블 비율 (%)로 출력
print("\n레이블 비율 (%):")
print(df_dataset['label'].value_counts(normalize=True) * 100)


In [None]:
df_dataset = data_loader.load_dataset()
# 데이터 타입 확인
print(df_dataset.dtypes)

In [None]:
class Aggregator(torch.nn.Module):
    
    def __init__(self, batch_size, dim, aggregator):
        super(Aggregator, self).__init__()
        self.batch_size = batch_size  # 미니배치 크기
        self.dim = dim                # 임베딩 차원
        self.aggregator = aggregator  # aggregation 방식: sum / concat / neighbor-only

        # aggregator 종류에 따라 파라미터 설정
        if aggregator == 'concat':
            # self 임베딩 + 이웃 임베딩을 concat하면 차원이 2배가 되므로
            self.weights = torch.nn.Linear(2 * dim, dim, bias=True)
        else:
            # sum or neighbor-only인 경우는 차원 그대로
            self.weights = torch.nn.Linear(dim, dim, bias=True)
        
    def forward(self, self_vectors, neighbor_vectors, neighbor_relations, user_embeddings, act):
        # self_vectors: [batch_size, -1, dim]   → 현재 엔티티 (노드) 임베딩
        # neighbor_vectors: [batch_size, -1, n_neighbor, dim] → 이웃 엔티티 임베딩
        # neighbor_relations: [batch_size, -1, n_neighbor, dim] → 이웃 관계 임베딩
        # user_embeddings: [batch_size, dim] → 사용자 임베딩 (user-aware attention에 사용)
        # act: 활성화 함수 (예: torch.relu)
        batch_size = user_embeddings.size(0)

        # batch_size가 변할 수 있어 동적으로 업데이트
        if batch_size != self.batch_size:
            self.batch_size = batch_size

        # 이웃 임베딩을 user-aware 방식으로 합치는 과정
        neighbors_agg = self._mix_neighbor_vectors(neighbor_vectors, neighbor_relations, user_embeddings)

        # aggregation 방식에 따라 self vector와 합치는 방식 달라짐
        if self.aggregator == 'sum':
            # self + neighbor 합산
            output = (self_vectors + neighbors_agg).view((-1, self.dim))

        elif self.aggregator == 'concat':
            # self와 neighbor를 concat한 후 projection
            output = torch.cat((self_vectors, neighbors_agg), dim=-1)  # [batch, -, 2*dim]
            output = output.view((-1, 2 * self.dim))

        else:
            # neighbor-only (self vector는 사용하지 않음)
            output = neighbors_agg.view((-1, self.dim))

        # projection + 활성화
        output = self.weights(output)  # [batch * -, dim]
        return act(output.view((self.batch_size, -1, self.dim)))  # 다시 배치 형태로 reshape

    def _mix_neighbor_vectors(self, neighbor_vectors, neighbor_relations, user_embeddings):
        # 사용자-관계에 따라 이웃 노드들을 가중 평균함 → user-aware attention으로 neighbor vector 집계
        # [batch, dim] → [batch, 1, 1, dim]
        user_embeddings = user_embeddings.view((self.batch_size, 1, 1, self.dim))

        # user와 relation 간의 점곱 score 계산 → [batch, -, n_neighbor]
        user_relation_scores = (user_embeddings * neighbor_relations).sum(dim=-1)

        # softmax로 attention weight 계산
        user_relation_scores_normalized = F.softmax(user_relation_scores, dim=-1)

        # [batch, -, n_neighbor] → [batch, -, n_neighbor, 1]
        user_relation_scores_normalized = user_relation_scores_normalized.unsqueeze(dim=-1)

        # attention weight * neighbor vector → 가중 평균 [batch, -, dim]
        neighbors_aggregated = (user_relation_scores_normalized * neighbor_vectors).sum(dim=2)

        return neighbors_aggregated

In [None]:
class KGCN(torch.nn.Module):
    def __init__(self, num_user, num_ent, num_rel, kg, args, device):
        super(KGCN, self).__init__()
        self.num_user = num_user          # 사용자 수
        self.num_ent = num_ent            # 엔티티 수
        self.num_rel = num_rel            # 관계 수
        self.n_iter = args.n_iter         # GCN 레이어 수 (= neighbor 탐색 깊이)
        self.batch_size = args.batch_size
        self.dim = args.dim               # 임베딩 차원
        self.n_neighbor = args.neighbor_sample_size  # 각 노드마다 샘플링할 이웃 수
        self.kg = kg                      # 지식 그래프 (dict)
        self.device = device              # GPU/CPU
        self.aggregator = Aggregator(self.batch_size, self.dim, args.aggregator)  # Aggregator 모듈

        self._gen_adj()  # 인접 엔티티/관계 샘플링 테이블 생성

        # Embedding layer: 사용자, 엔티티, 관계
        self.usr = torch.nn.Embedding(num_user, args.dim)
        self.ent = torch.nn.Embedding(num_ent, args.dim)
        self.rel = torch.nn.Embedding(num_rel, args.dim)

    def _gen_adj(self): # 엔티티마다 고정 개수의 이웃 엔티티 및 관계를 샘플링하여 인접행렬 생성
        self.adj_ent = torch.empty(self.num_ent, self.n_neighbor, dtype=torch.long)
        self.adj_rel = torch.empty(self.num_ent, self.n_neighbor, dtype=torch.long)

        for e in self.kg:
            if len(self.kg[e]) >= self.n_neighbor:
                neighbors = random.sample(self.kg[e], self.n_neighbor)
            else:
                neighbors = random.choices(self.kg[e], k=self.n_neighbor)  # 중복 허용 샘플링

            self.adj_ent[e] = torch.LongTensor([ent for _, ent in neighbors])
            self.adj_rel[e] = torch.LongTensor([rel for rel, _ in neighbors])

    def forward(self, u, v): # forward 호출 시 (user, item) 쌍 입력 → user-aware item score 출력 / u: [batch_size], v: [batch_size]
        batch_size = u.size(0)
        if batch_size != self.batch_size:
            self.batch_size = batch_size

        # shape 맞추기: [batch_size, 1]
        u = u.view((-1, 1))
        v = v.view((-1, 1))

        # 사용자 임베딩: [batch_size, dim]
        user_embeddings = self.usr(u).squeeze(dim=1)

        # item을 기준으로 multi-hop 이웃 엔티티/관계 가져오기
        entities, relations = self._get_neighbors(v)

        # 이웃 정보를 user-aware 방식으로 aggregation
        item_embeddings = self._aggregate(user_embeddings, entities, relations)

        # 최종 user-item score 계산: 내적 후 sigmoid
        scores = (user_embeddings * item_embeddings).sum(dim=1)

        return torch.sigmoid(scores)

    def _get_neighbors(self, v): # 엔티티 v의 multi-hop 이웃들을 adj matrix 기반으로 샘플링 / v: [batch_size, 1] → 1-hop, 2-hop ...까지 쌓임
        entities = [v]  # 0-hop (자기 자신)
        relations = []

        for h in range(self.n_iter):  # hop 수만큼 반복
            # 현재 hop의 엔티티에서 이웃 추출
            neighbor_entities = torch.LongTensor(self.adj_ent[entities[h].cpu()]) \
                                    .view((self.batch_size, -1)).to(self.device)
            neighbor_relations = torch.LongTensor(self.adj_rel[entities[h].cpu()]) \
                                    .view((self.batch_size, -1)).to(self.device)

            entities.append(neighbor_entities)
            relations.append(neighbor_relations)

        return entities, relations

    def _aggregate(self, user_embeddings, entities, relations): # Aggregator를 사용해 multi-hop 이웃 정보를 통합
        # user_embeddings: [batch_size, dim]
        # entities: hop별 entity 리스트
        # relations: hop별 relation 리스트
        
        # hop 별로 entity / relation 임베딩 추출
        entity_vectors = [self.ent(entity) for entity in entities]
        relation_vectors = [self.rel(relation) for relation in relations]

        # hop 수만큼 반복적으로 aggregation
        for i in range(self.n_iter):
            # 마지막 hop은 tanh, 그 외는 sigmoid
            act = torch.tanh if i == self.n_iter - 1 else torch.sigmoid

            entity_vectors_next_iter = []
            for hop in range(self.n_iter - i):
                vector = self.aggregator(
                    self_vectors=entity_vectors[hop],
                    neighbor_vectors=entity_vectors[hop + 1] \
                        .view((self.batch_size, -1, self.n_neighbor, self.dim)),
                    neighbor_relations=relation_vectors[hop] \
                        .view((self.batch_size, -1, self.n_neighbor, self.dim)),
                    user_embeddings=user_embeddings,
                    act=act
                )
                entity_vectors_next_iter.append(vector)

            # 다음 hop의 결과로 갱신
            entity_vectors = entity_vectors_next_iter

        # 최종 item 임베딩 반환
        return entity_vectors[0].view((self.batch_size, self.dim))