<a href="https://colab.research.google.com/github/HyunWrites0721/LLM_Based_OTT_recommender/blob/main/LLM_Based_OTTRec.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#데이터 로딩.

import pandas as pd
import os

path = "/content/drive/MyDrive/Colab Notebooks/for_data/ml-1m/"

# 1. Ratings: 유저의 영화 평점 데이터
ratings = pd.read_csv(path + 'ratings.dat', sep='::', engine='python', encoding='latin-1',
                      names=['user_id', 'movie_id', 'rating', 'timestamp'])

# 2. Movies: 영화 제목 및 장르 데이터
movies = pd.read_csv(path + 'movies.dat', sep='::', engine='python', encoding='latin-1',
                     names=['movie_id', 'title', 'genres'])

# 3. Users: 유저 데모그래픽(성별, 나이 등) 데이터
users = pd.read_csv(path + 'users.dat', sep='::', engine='python', encoding='latin-1',
                    names=['user_id', 'gender', 'age', 'occupation', 'zip_code'])

print("데이터 로드 완료!")
print(f"유저 수: {len(users)}, 영화 수: {len(movies)}, 평점 수: {len(ratings)}")

데이터 로드 완료!
유저 수: 6040, 영화 수: 3883, 평점 수: 1000209


In [2]:
#데이터 정제

# 1. 제목(Title)을 기준으로 고유한 영화 목록 생성
unique_movies = movies.drop_duplicates(subset=['title']).copy()

# 2. 새로운 고유 ID 부여 (1부터 시작, 패딩이 0이므로 1부터 시작해야 함.)
unique_movies['new_movie_id'] = range(1, len(unique_movies)+1)

# 3. {기존 ID : 새로운 ID} 매핑 딕셔너리 생성
id_mapping = pd.merge(movies, unique_movies[['title', 'new_movie_id']], on='title')
mapping_dict = dict(zip(id_mapping['movie_id'], id_mapping['new_movie_id']))

# 4. Ratings 데이터 기존 ID를 새로운 고유 ID로 치환 (매핑되지 않는 유령 ID는 제거)
ratings['movie_id'] = ratings['movie_id'].map(mapping_dict)
ratings = ratings.dropna(subset=['movie_id']).astype({'movie_id': int})

# 5. 최종 결과 확인
print(f"기존 영화 수: {len(movies)}")
print(f"정제 후 고유 영화 수: {len(unique_movies)}")

기존 영화 수: 3883
정제 후 고유 영화 수: 3883


In [3]:
print(unique_movies)
print(ratings)

Unnamed: 0,movie_id,title,genres,new_movie_id
0,1,Toy Story (1995),Animation|Children's|Comedy,1
1,2,Jumanji (1995),Adventure|Children's|Fantasy,2
2,3,Grumpier Old Men (1995),Comedy|Romance,3
3,4,Waiting to Exhale (1995),Comedy|Drama,4
4,5,Father of the Bride Part II (1995),Comedy,5
...,...,...,...,...
3878,3948,Meet the Parents (2000),Comedy,3879
3879,3949,Requiem for a Dream (2000),Drama,3880
3880,3950,Tigerland (2000),Drama,3881
3881,3951,Two Family House (2000),Drama,3882


In [5]:
# SASRec 학습용 유저 시퀀스 만들기

# 유저별로 영화 시청 이력을 시간순으로 묶기
user_train = {}
user_valid = {}
user_test = {}

user_group = ratings.groupby('user_id')
#print(user_group.groups)

for user, group in user_group:
    seq = group.sort_values('timestamp')['movie_id'].tolist()

    # 데이터가 너무 적은 유저는 제외 (최소 3개 이상, train/valid/test로 분할하기 위해)
    if len(seq) < 3:
        continue

    # Leave-one-out 분할
    user_train[user] = seq[:-2]
    user_valid[user] = [seq[-2]]
    user_test[user] = [seq[-1]]

print(f"남은 유저 수: {len(user_train)}")
print(f"1번 유저의 학습 시퀀스 예시: {user_train[1][:5]}")


남은 유저 수: 6040
1번 유저의 학습 시퀀스 예시: [3118, 1251, 1673, 1010, 2272]


In [None]:
#평가용 메타데이터1 : total_users, all_item_counts, item_genre_dict

# 1. total_users: 전체 유저 수 계산
total_users = ratings['user_id'].nunique()

# 2. all_item_counts: 아이템별 노출 빈도 (Novelty 계산용)
# 각 영화가 몇 명의 유저에게 소비되었는지 계산
all_item_counts = ratings['movie_id'].value_counts().to_dict()

# 3. item_genre_dict: {new_movie_id: [장르 리스트]} (Diversity, Transition용)
item_genre_dict = {}

for _, row in unique_movies.iterrows():
    new_id = row['new_movie_id']
    # 장르 데이터가 'Action|Sci-Fi' 형태일 경우 리스트로 분할
    genres = row['genres'].split('|')
    item_genre_dict[new_id] = genres

#print(f"전체 유저 수: {total_users}")
#print(f"장르 정보가 포함된 영화 수: {len(item_genre_dict)}")
#print(f"1번 영화의 장르: {item_genre_dict.get(1)}")


#평가용 메타데이터2 : all_genres, genre_to_idx, idx_to_genre

# 모든 고유 장르 추출 및 정렬
all_genres = sorted(list(set([g for genres in item_genre_dict.values() for g in genres])))

# 장르명 <-> 행렬 인덱스 매핑
genre_to_idx = {g: i for i, g in enumerate(all_genres)}
idx_to_genre = {i: g for i, g in enumerate(all_genres)}

#print(f"추출된 총 장르 수: {len(all_genres)}")
#print(f"장르 매핑 예시: {list(genre_to_idx.items())[:3]}")

In [None]:
#평가용 메타데이터3: 장르 전이 행렬 계산 함수: 한 장르에서 다른 장르로 넘어갈 확률을 저장하는 행렬

def build_genre_transition_matrix(ratings, item_genre_dict):
    # 모든 고유 장르 추출
    all_genres = sorted(list(set([g for genres in item_genre_dict.values() for g in genres])))
    genre_to_idx = {g: i for i, g in enumerate(all_genres)}
    num_genres = len(all_genres)

    # matrix[from_genre][to_genre]
    matrix = np.zeros((num_genres, num_genres))

    # 유저별 시퀀스 확인
    for user, group in ratings.groupby('user_id'):
        seq = group.sort_values('timestamp')['movie_id'].tolist()
        for i in range(len(seq) - 1):
            from_genres = item_genre_dict.get(seq[i], [])
            to_genres = item_genre_dict.get(seq[i+1], [])

            # 다중 장르일 경우 모든 조합에 대해 빈도 추가 (분산 처리)
            for fg in from_genres:
                for tg in to_genres:
                    matrix[genre_to_idx[fg]][genre_to_idx[tg]] += 1

    # 행 단위로 정규화 (확률로 변환)
    row_sums = matrix.sum(axis=1)
    # 0으로 나누기 방지
    transition_matrix = np.divide(matrix, row_sums[:, np.newaxis],
                                  where=row_sums[:, np.newaxis]!=0)

    return transition_matrix, genre_to_idx

# 실행 예시
# transition_matrix, genre_mapping = build_genre_transition_matrix(ratings, item_genre_dict)

In [6]:
#SASRecDataset 클래스 정의

import torch
from torch.utils.data import Dataset
import numpy as np

class SASRecDataset(Dataset):
    def __init__(self, user_train, item_count, max_len):
        self.user_train = user_train  # {user ID: [item list]} dictionary
        self.item_count = item_count  # total number of movies
        self.max_len = max_len        # maximum sequence length L
        self.user_list = list(user_train.keys())  # List of user ids

    def __len__(self):
        return len(self.user_list)

    def __getitem__(self, index):
        user = self.user_list[index]
        seq = self.user_train[user]

        # 1. Sequence Padding & Truncating
        # Pad by 0 or Truncate if larger than L
        input_seq = np.zeros([self.max_len], dtype=np.int32)
        pos = np.zeros([self.max_len], dtype=np.int32)  #true next item
        neg = np.zeros([self.max_len], dtype=np.int32)  #not next item

        # nxt는 정답(Positive), idx는 현재 입력(Input)
        nxt = seq[-1]
        idx = self.max_len - 1

        # 뒤에서부터 채워나가는 방식 (Left Padding을 위함)
        for i in reversed(seq[:-1]):
            input_seq[idx] = i
            pos[idx] = nxt
            # Negative Sampling: random sampling from items user haven't seen
            if nxt != 0: # if not padding
                neg[idx] = self._get_neg_item(seq)
            nxt = i
            idx -= 1
            if idx == -1: break

        return torch.LongTensor([user]), torch.LongTensor(input_seq), \
               torch.LongTensor(pos), torch.LongTensor(neg)

    def _get_neg_item(self, seq):
        # 학습 효율을 위해 유저 시퀀스에 없는 아이템을 랜덤하게 뽑음
        t = np.random.randint(1, self.item_count + 1)
        while t in seq:
            t = np.random.randint(1, self.item_count + 1)
        return t

In [7]:
#평가용 단기지표 계산 함수
#반환값: Recall, NDCG

import math

def evaluate_metrics(predictions, ground_truth, k_list=[5, 10]):
    """
    predictions: 모델이 예측한 전체 아이템에 대한 점수 (Batch, Item_Count)
    ground_truth: 실제 정답 아이템 ID (Batch, 1)
    k_list: 계산하고 싶은 K 값의 리스트
    """
    results = {}

    # 1. 상위 K개의 아이템 인덱스(ID) 추출
    # torch.topk는 내림차순으로 가장 높은 값 k개를 반환함
    max_k = max(k_list)
    _, top_indices = torch.topk(predictions, max_k, dim=-1) # (Batch, max_k)

    # 2. 각 K별로 지표 계산
    for k in k_list:
        hit = 0.0
        ndcg = 0.0

        for i in range(len(ground_truth)):
            target = ground_truth[i]       # 실제 정답 ID
            recommended = top_indices[i][:k] # 상위 k개 추천 리스트

            if target in recommended:
                # [Recall 계산] 포함되면 1 아니면 0
                hit += 1.0

                # [NDCG 계산] 정답의 순위(rank)를 찾아 로그 감쇠 적용
                # index는 0부터 시작하므로 +1을 해줌, 또 +1을 하는 이유는 log 때문에. 그래서 +2.
                rank = (recommended == target).nonzero(as_tuple=True)[0].item()
                ndcg += 1.0 / math.log2(rank + 2)

        results[f'Recall@{k}'] = hit / len(ground_truth)
        results[f'NDCG@{k}'] = ndcg / len(ground_truth)

    return results

In [None]:
#평가용 장기지표 계산 함수
#Intra-List Diversity / Novelty / Serendipity / Genre-Transition Probability / Recency-Weighted Satisfaction

def evaluate_long_term_metrics(recommended_list, user_history, ground_truth, item_genre_dict,
                               all_item_counts, total_users, transition_matrix, genre_to_idx):
    """
    ground_truth: 실제 유저가 본 다음 영화 ID (보통 user_test[user][0])
    transition_matrix: build_genre_transition_matrix 함수로 만든 행렬
    genre_to_idx: 장르명 -> 행렬 인덱스 매핑 사전
    """

    # 공통 함수: Jaccard 유사도
    def get_genre_sim(item1, item2):
        g1, g2 = set(item_genre_dict.get(item1, [])), set(item_genre_dict.get(item2, []))
        if not g1 or not g2: return 0
        return len(g1 & g2) / len(g1 | g2)

    # 1. Intra-List Diversity
    sims = []
    for i in range(len(recommended_list)):
        for j in range(i + 1, len(recommended_list)):
            sims.append(get_genre_sim(recommended_list[i], recommended_list[j]))
    intra_diversity = 1 - np.mean(sims) if sims else 0

    # 2. Novelty
    novelty = 0
    for item in recommended_list:
        prob = (all_item_counts.get(item, 0)+1) / (total_users + 1)
        novelty -= np.log2(prob)
    novelty /= len(recommended_list)

    # 3. Serendipity (엄밀한 정의로 수정)
    # 조건: 추천 리스트에 정답(ground_truth)이 포함되어 있어야 하며,
    # 그 정답이 과거 히스토리 아이템들과의 평균 유사도가 낮아야 함.
    serendipity = 0
    if ground_truth in recommended_list:
        # 정답 아이템과 과거 히스토리 간의 평균 유사도 계산
        rel_sims = [get_genre_sim(ground_truth, hist_item) for hist_item in user_history]
        avg_rel_sim = np.mean(rel_sims) if rel_sims else 0
        # 유사도가 낮을수록(즉, 의외일수록) 높은 점수
        serendipity = 1 - avg_rel_sim

    # 4. Genre-Transition Probability (전이 행렬 활용)
    # 마지막 아이템의 장르들로부터 추천 리스트 아이템들의 장르들로 이동할 확률의 평균
    from_genres = item_genre_dict.get(user_history[-1], [])
    transition_scores = []

    for rec_item in recommended_list:
        to_genres = item_genre_dict.get(rec_item, [])
        item_transition_probs = []
        for fg in from_genres:
            for tg in to_genres:
                # 행렬에서 확률값 추출
                f_idx, t_idx = genre_to_idx[fg], genre_to_idx[tg]
                item_transition_probs.append(transition_matrix[f_idx, t_idx])

        if item_transition_probs:
            transition_scores.append(np.mean(item_transition_probs))

    genre_transition = np.mean(transition_scores) if transition_scores else 0

    # 5. Recency-Weighted Satisfaction
    recency_weighted_sim = 0
    lambda_decay = 0.1
    for idx, hist_item in enumerate(reversed(user_history)):
        weight = np.exp(-lambda_decay * idx)
        for rec_item in recommended_list:
            recency_weighted_sim += weight * get_genre_sim(hist_item, rec_item)

    return {
        "Diversity": intra_diversity,
        "Novelty": novelty,
        "Serendipity": serendipity,
        "Transition_Prob": genre_transition,
        "Recency_Satisfy": recency_weighted_sim
    }

In [None]:
#모델 준비

import torch.nn as nn

class SASRec(nn.Module):
    def __init__(self, item_count, hidden_units, num_blocks, num_heads, max_len, dropout_rate, device):
        super(SASRec, self).__init__()
        self.item_count = item_count
        self.device = device

        # 1. Embedding Layers
        # item_count + 1 인 이유는 0번 패딩 때문
        self.item_emb = nn.Embedding(item_count + 1, hidden_units, padding_idx=0)
        self.pos_emb = nn.Embedding(max_len, hidden_units)
        self.emb_dropout = nn.Dropout(dropout_rate)

        # 2. Multi-head Attention Blocks
        self.attention_blocks = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=hidden_units,
                nhead=num_heads,
                dim_feedforward=hidden_units,
                dropout=dropout_rate,
                activation='relu',
                batch_first=True
            ) for _ in range(num_blocks)
        ])

        self.last_layernorm = nn.LayerNorm(hidden_units)

    def forward(self, log_seqs):
        # log_seqs: (Batch, max_len)

        # 아이템 임베딩 + 위치 임베딩
        seqs = self.item_emb(log_seqs) # (Batch, max_len, hidden_units)
        positions = torch.arange(log_seqs.shape[1], device=self.device).unsqueeze(0)
        seqs += self.pos_emb(positions)
        seqs = self.emb_dropout(seqs)

        # 0번 패딩 마스크 생성 (Attention이 패딩을 무시하도록)
        timeline_mask = (log_seqs == 0) # (Batch, max_len)

        # Causal Mask (미래 정보를 보지 못하게 함)
        # nn.TransformerEncoderLayer에서 쓸 수 있도록 상삼각행렬 생성
        attn_mask = torch.triu(torch.ones((log_seqs.shape[1], log_seqs.shape[1]), device=self.device), diagonal=1).bool()

        # Attention Blocks 통과
        for block in self.attention_blocks:
            # src_key_padding_mask와 mask를 사용하여 패딩과 미래 정보를 차단
            seqs = block(seqs, src_mask=attn_mask, src_key_padding_mask=timeline_mask)

        log_feats = self.last_layernorm(seqs) # (Batch, max_len, hidden_units)

        return log_feats