In [2]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from transformers import BertTokenizer, BertModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from PIL import Image
import os
from transformers import AutoModel  # Electra, BERT 모델을 동적으로 불러오기 위해

## 전처리

In [3]:
# 데이터 로드
books_df = pd.read_csv('../../data/books.csv')
users_df = pd.read_csv('../../data/users.csv')
train_ratings_df = pd.read_csv('../../data/train_ratings.csv')
test_ratings_df = pd.read_csv('../../data/test_ratings.csv')

In [4]:
users_df['age'] = pd.to_numeric(users_df['age'], errors='coerce')
median_age = users_df['age'].median()
users_df['age'] = users_df['age'].fillna(median_age)

In [5]:
books_df['year_of_publication'] = pd.to_numeric(books_df['year_of_publication'], errors='coerce')
median_year = books_df['year_of_publication'].median()
books_df['year_of_publication'] = books_df['year_of_publication'].fillna(median_year).astype(int)

In [6]:
# 데이터 병합
train_data = train_ratings_df.merge(books_df, on='isbn').merge(users_df, on='user_id')
test_data = test_ratings_df.merge(books_df, on='isbn').merge(users_df, on='user_id')


In [7]:
# User의 평균 평점 계산
user_avg_rating = train_data.groupby('user_id')['rating'].mean().reset_index(name='user_mean_rating')

# Book의 평균 평점 계산
book_avg_rating = train_data.groupby('isbn')['rating'].mean().reset_index(name='book_mean_rating')

# 원래 데이터프레임에 user와 book의 평균 평점 합치기
train_data = train_data.merge(user_avg_rating, on='user_id')
train_data = train_data.merge(book_avg_rating, on='isbn')

# 테스트 데이터에 훈련 데이터의 평균 평점 적용
test_data = test_data.merge(user_avg_rating, on='user_id', how='left')
test_data = test_data.merge(book_avg_rating, on='isbn', how='left')

In [8]:
# train_data와 test_data에 country 컬럼 추가
train_data['country'] = train_data['location'].apply(lambda x: x.split(',')[-1].strip())
test_data['country'] = test_data['location'].apply(lambda x: x.split(',')[-1].strip())

# train_data에서 Target Encoding 값 계산
target_encoding_map = train_data.groupby('country')['rating'].mean().to_dict()
train_data['country_encoded'] = train_data['country'].map(target_encoding_map)

# test_data에 적용
global_mean = train_data['rating'].mean()
test_data['country_encoded'] = test_data['country'].map(target_encoding_map).fillna(global_mean)


In [9]:
# 텍스트 결측치 처리 
train_data['book_author'] = train_data['book_author'].fillna(' ')
train_data['summary'] = train_data['summary'].fillna(' ')
test_data['book_author'] = test_data['book_author'].fillna(' ')
test_data['summary'] = test_data['summary'].fillna(' ')

In [10]:
# 테스트 데이터 결측치 처리 
test_data['user_mean_rating'] = test_data['user_mean_rating'].fillna(test_data['user_mean_rating'].mean())
test_data['book_mean_rating'] = test_data['book_mean_rating'].fillna(test_data['book_mean_rating'].mean())

In [11]:
# 필요한 특징 선택
features = ['user_id', 'isbn', 'book_title', 'book_author', 'year_of_publication', 'publisher', 
            'age', 'img_path', 'summary', 'user_mean_rating', 'book_mean_rating', 'country_encoded']
train_data = train_data[features + ['rating']]
test_data = test_data[features]


In [12]:
# # 텍스트 전처리를 위한 토크나이저 초기화
# tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')


from transformers import AutoTokenizer

# Electra에 맞는 토크나이저 초기화
tokenizer = AutoTokenizer.from_pretrained("google/electra-small-discriminator")


In [13]:
# 이미지 전처리를 위한 변환 정의
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [14]:
# 정규화 
scaler = StandardScaler()
numerical_features = ['year_of_publication', 'age', 'user_mean_rating', 'book_mean_rating']
train_data[numerical_features] = scaler.fit_transform(train_data[numerical_features])
test_data[numerical_features] = scaler.transform(test_data[numerical_features])

## WDN 구현 

In [15]:
# Wide & Deep Network 모델 구현
class WideAndDeepModel(nn.Module):
    def __init__(self, num_numerical_features, text_model_name="google/electra-small-discriminator"):
        super().__init__()
        
        # Wide 파트 (단순 합산)
        self.wide = nn.Linear(num_numerical_features, 1)

        # Deep 파트 - 이미지 인코더
        self.image_encoder = models.resnet18(pretrained=True)
        self.image_encoder.fc = nn.Linear(self.image_encoder.fc.in_features, 128)
        
        # Deep 파트 - 텍스트 인코더
        self.text_encoder = AutoModel.from_pretrained(text_model_name)
        text_embedding_dim = 256 if "electra" in text_model_name else 768
        self.text_fc = nn.Linear(text_embedding_dim, 128)
        
        # Deep 파트 - 정형 데이터 인코더
        self.numerical_encoder = nn.Sequential(
            nn.Linear(num_numerical_features, 64),
            nn.ReLU(),
            nn.Linear(64, 128)
        )
        
        # Deep 파트 - 퓨전
        self.deep_fusion = nn.Sequential(
            nn.Linear(128 * 3, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU()
        )

        # Wide와 Deep 결합을 위한 최종 예측 층
        self.final_layer = nn.Linear(128 + 1, 1)

    def forward(self, image, text_ids, text_mask, numerical_features, wide_features):
        # Wide 파트 출력
        y_wide = self.wide(wide_features).squeeze(1)
        
        # Deep 파트 - 이미지 인코딩
        image_features = self.image_encoder(image)
        
        # Deep 파트 - 텍스트 인코딩
        text_output = self.text_encoder(input_ids=text_ids, attention_mask=text_mask)
        if hasattr(text_output, "pooler_output"):
            text_features = self.text_fc(text_output.pooler_output)
        else:
            text_features = self.text_fc(text_output.last_hidden_state[:, 0, :])
        
        # Deep 파트 - 정형 데이터 인코딩
        numerical_features = self.numerical_encoder(numerical_features)
        
        # Deep 파트 - 이미지, 텍스트, 정형 데이터 특징 결합
        combined_features = torch.cat([image_features, text_features, numerical_features], dim=1)
        y_deep = self.deep_fusion(combined_features)

        # Wide와 Deep 출력 결합
        # final_features = torch.cat([y_wide, y_deep], dim=1)
        # rating = self.final_layer(final_features)
        # Wide와 Deep 출력 결합
        final_features = torch.cat([y_wide.unsqueeze(1), y_deep], dim=1)
        rating = self.final_layer(final_features)

        
        return rating.squeeze()


In [16]:
# 데이터셋 클래스 정의
class BookDataset(Dataset):
    def __init__(self, dataframe, tokenizer, numerical_features, max_length=128):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.numerical_features = numerical_features

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        
        # 텍스트 처리
        text = f"{row['book_title']} {row['book_author']} {row['summary']}"

        text = ' '.join([str(item) if pd.notna(item) else '' for item in text.split()])
        
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        # 이미지 처리
        # 이미지 경로 설정 시 중복되는 'images/' 부분 제거
        image_filename = row['img_path'].replace('images/', '')  # 'images/' 부분 제거
        image_path = os.path.join('/data/ephemeral/home/data/images', image_filename)
        #image_path = os.path.join('/data/ephemeral/home/data/images', row['img_path'])

        image = Image.open(image_path).convert('RGB')
        image = image_transform(image)

        # 수치형 특징
        # numerical = torch.tensor([row[feature] for feature in numerical_features], dtype=torch.float)
        numerical = torch.tensor([row[feature] for feature in self.numerical_features], dtype=torch.float)


        # 레이블 (학습 데이터의 경우)
        if 'rating' in row:
            label = torch.tensor(row['rating'], dtype=torch.float)
        else:
            label = torch.tensor(0, dtype=torch.float)  # 테스트 데이터의 경우 더미 값

        return {
            'text_ids': encoding['input_ids'].flatten(),
            'text_mask': encoding['attention_mask'].flatten(),
            'image': image,
            'numerical': numerical,
            'label': label
        }

In [17]:
from sklearn.model_selection import train_test_split

train_data_1, val_data = train_test_split(train_data, test_size=0.2, random_state=42)

In [18]:
# train_dataset = BookDataset(train_data, tokenizer)
# val_dataset = BookDataset(val_data, tokenizer)
# test_dataset = BookDataset(test_data, tokenizer)

train_dataset = BookDataset(train_data, tokenizer, numerical_features=numerical_features)
val_dataset = BookDataset(val_data, tokenizer, numerical_features=numerical_features)
test_dataset = BookDataset(test_data, tokenizer, numerical_features=numerical_features)


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [19]:
class MultimodalBookRatingModel(nn.Module):
    def __init__(self, num_numerical_features, text_model_name="google/electra-small-discriminator"):
        super().__init__()
        
        # 이미지 인코더
        self.image_encoder = models.resnet18(pretrained=True)
        self.image_encoder.fc = nn.Linear(self.image_encoder.fc.in_features, 128)
        
        # 텍스트 인코더
        self.text_encoder = AutoModel.from_pretrained(text_model_name)
        
        # 텍스트 임베딩 차원을 모델에 따라 동적으로 설정
        text_embedding_dim = 256 if "electra" in text_model_name else 768
        self.text_fc = nn.Linear(text_embedding_dim, 128)
        
        # 정형 데이터 인코더
        self.numerical_encoder = nn.Sequential(
            nn.Linear(num_numerical_features, 64),
            nn.ReLU(),
            nn.Linear(64, 128)
        )
        
        # 멀티모달 퓨전
        self.fusion = nn.Sequential(
            nn.Linear(128 * 3, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
        
    def forward(self, image, text_ids, text_mask, numerical_features):
        # 이미지 인코딩
        image_features = self.image_encoder(image)
        
        # 텍스트 인코딩
        text_output = self.text_encoder(input_ids=text_ids, attention_mask=text_mask)
        
        # Electra와 BERT 모델을 구분하여 텍스트 임베딩 생성
        if hasattr(text_output, "pooler_output"):
            text_features = self.text_fc(text_output.pooler_output)
        else:
            text_features = self.text_fc(text_output.last_hidden_state[:, 0, :])
        
        # 정형 데이터 인코딩
        numerical_features = self.numerical_encoder(numerical_features)
        
        # 특징 결합
        combined_features = torch.cat([image_features, text_features, numerical_features], dim=1)
        
        # 평점 예측
        rating = self.fusion(combined_features)
        
        return rating.squeeze()


- WDN / 2-layer MLP 중 하나 선택하여 model 설정

In [20]:
# 모델 초기화
# model = MultimodalBookRatingModel(num_numerical_features=len(numerical_features)) # WDN 아닌 기존 2-layer MLP 
model = WideAndDeepModel(num_numerical_features=len(numerical_features)) # WDN
model = model.to('cuda' if torch.cuda.is_available() else 'cpu')

# 손실 함수와 옵티마이저 정의
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)


  return self.fget.__get__(instance, owner)()


- WDN / 2-layer MLP 중 하나 선택하여 model 설정

In [44]:
# 학습 함수
def train_epoch(model, data_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for batch in data_loader:
        optimizer.zero_grad()
        
        text_ids = batch['text_ids'].to(device)
        text_mask = batch['text_mask'].to(device)
        image = batch['image'].to(device)
        numerical = batch['numerical'].to(device)
        labels = batch['label'].to(device)
        
        # wide_features를 numerical로 전달
        # outputs = model(image, text_ids, text_mask, numerical) # WDN 이 아닌 기존 Multimodal을 사용할 경우 
        outputs = model(image, text_ids, text_mask, numerical, wide_features=numerical) # WDN
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(data_loader)

# 평가 함수
def evaluate(model, data_loader, criterion, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in data_loader:
            text_ids = batch['text_ids'].to(device)
            text_mask = batch['text_mask'].to(device)
            image = batch['image'].to(device)
            numerical = batch['numerical'].to(device)
            labels = batch['label'].to(device)
            
            # wide_features를 numerical로 전달
            # outputs = model(image, text_ids, text_mask, numerical) # WDN 이 아닌 기존 Multimodal
            outputs = model(image, text_ids, text_mask, numerical, wide_features=numerical) # WDN
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
    
    return total_loss / len(data_loader)


In [45]:
# 학습 루프
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_epochs = 10

for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss = evaluate(model, val_loader, criterion, device)
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

In [None]:
# 테스트 데이터에 대한 예측
model.eval()
predictions = []

In [None]:
with torch.no_grad():
    for batch in test_loader:
        text_ids = batch['text_ids'].to(device)
        text_mask = batch['text_mask'].to(device)
        image = batch['image'].to(device)
        numerical = batch['numerical'].to(device)
        
        # outputs = model(image, text_ids, text_mask, numerical) # WDN 이 아닌 기존 Multimodal
        outputs = model(image, text_ids, text_mask, numerical, wide_features=numerical) # WDN
        predictions.extend(outputs.cpu().numpy()) 


In [None]:
# 예측 결과를 테스트 데이터프레임에 추가
test_data['rating'] = predictions
test_data['rating'] = test_data['rating'].fillna(test_data['rating'].mean()) # 결측치는 평균값으로 보간

In [None]:
# 결과 저장
submission = test_data[['user_id', 'isbn', 'rating']]
submission.to_csv('submission.csv', index=False)

print("예측이 완료되었습니다. 결과는 'submission.csv' 파일에 저장되었습니다.")