In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class MultiLabelClassifier(nn.Module):
    def __init__(self, transformer_encoder, embedding_dim, num_classes, fc_hidden_dims=[512, 256, 128], dropout=0.1):
        """
        멀티레이블 분류를 위한 클래스
        Args:
            transformer_encoder: 사전 구현된 트랜스포머 인코더 모델
            embedding_dim: 인코더의 출력 임베딩 차원
            num_classes: 분류할 클래스 개수
            fc_hidden_dims: 3개 완전 연결 층의 은닉층 차원 리스트
            dropout: 드롭아웃 비율
        """
        super(MultiLabelClassifier, self).__init__()
        
        # 트랜스포머 인코더
        self.transformer_encoder = transformer_encoder
        
        # 분류 헤드 (3개의 완전 연결 층)
        self.classifier = nn.Sequential(
            # 첫 번째 완전 연결 층
            nn.Linear(embedding_dim, fc_hidden_dims[0]),
            nn.ReLU(),
            nn.Dropout(dropout),
            
            # 두 번째 완전 연결 층
            nn.Linear(fc_hidden_dims[0], fc_hidden_dims[1]),
            nn.ReLU(),
            nn.Dropout(dropout),
            
            # 세 번째 완전 연결 층
            nn.Linear(fc_hidden_dims[1], fc_hidden_dims[2]),
            nn.ReLU(),
            nn.Dropout(dropout),
            
            # 출력 층 (시그모이드 활성화 함수)
            nn.Linear(fc_hidden_dims[2], num_classes),
            nn.Sigmoid()  # 멀티레이블 분류를 위한 시그모이드 활성화 함수
        )
        
    def forward(self, x, mask=None):
        """
        모델 순전파
        Args:
            x: 입력 텐서 (batch_size, seq_length)
            mask: 어텐션 마스크 (optional)
        Returns:
            각 클래스에 대한 확률 (batch_size, num_classes)
        """
        # 트랜스포머 인코더 통과
        encoder_output = self.transformer_encoder(x, mask)  # (batch_size, seq_length, embedding_dim)
        
        # 시퀀스의 [CLS] 토큰(첫 번째 토큰) 사용 또는 전체 시퀀스의 평균 사용
        # 방법 1: [CLS] 토큰 표현 사용
        sequence_representation = encoder_output[:, 0, :]  # (batch_size, embedding_dim)
        
        # 방법 2: 전체 시퀀스의 평균 사용 (선택적)
        # sequence_representation = torch.mean(encoder_output, dim=1)  # (batch_size, embedding_dim)
        
        # 분류 헤드 통과
        logits = self.classifier(sequence_representation)  # (batch_size, num_classes)
        
        return logits

class WeightedBinaryCrossEntropyLoss(nn.Module):
    def __init__(self, pos_weight=None, reduction='mean'):
        """
        가중치가 적용된 바이너리 크로스 엔트로피 손실 함수
        Args:
            pos_weight: 각 클래스의 양성 샘플에 대한 가중치 (클래스별 가중치 텐서)
            reduction: 손실 감소 방식 ('mean', 'sum', 'none')
        """
        super(WeightedBinaryCrossEntropyLoss, self).__init__()
        self.pos_weight = pos_weight
        self.reduction = reduction
        
    def forward(self, logits, targets):
        """
        손실 계산
        Args:
            logits: 모델 출력 (batch_size, num_classes)
            targets: 타겟 레이블 (batch_size, num_classes), 이진값 (0 또는 1)
        Returns:
            손실 값
        """
        # 이미 시그모이드가 적용된 출력이므로 BCELoss 사용
        if self.pos_weight is not None:
            # 클래스별 가중치 적용
            weight = self.pos_weight.expand_as(targets)
            
            # 양성(1) 및 음성(0) 샘플에 대한 가중치 적용
            weight_pos = weight * targets
            weight_neg = (1 - targets)
            weights = weight_pos + weight_neg
            
            # 가중치가 적용된 이진 크로스 엔트로피 손실
            loss = F.binary_cross_entropy(logits, targets, weight=weights, reduction='none')
        else:
            loss = F.binary_cross_entropy(logits, targets, reduction='none')
        
        # 감소 방식에 따라 손실 처리
        if self.reduction == 'mean':
            return loss.mean()
        elif self.reduction == 'sum':
            return loss.sum()
        else:  # 'none'
            return loss

def calculate_class_weights(labels, beta=0.999):
    """
    클래스 불균형을 다루기 위한 가중치 계산 
    (Cui et al., 2019) "Class-balanced loss based on effective number of samples"
    
    Args:
        labels: 훈련 데이터의 레이블 텐서 (num_samples, num_classes)
        beta: 클래스 불균형 조정을 위한 스무딩 팩터
    Returns:
        클래스별 가중치 텐서
    """
    # 각 클래스의 샘플 수 계산
    num_samples_per_class = torch.sum(labels, dim=0)  # (num_classes)
    
    # 0으로 나누기 방지
    num_samples_per_class = torch.clamp(num_samples_per_class, min=1)
    
    # 효과적인 샘플 수 계산
    effective_num = 1.0 - torch.pow(beta, num_samples_per_class)
    effective_num = torch.clamp(effective_num, min=1e-8)
    
    # 클래스 가중치 계산
    weights = (1.0 - beta) / effective_num
    
    # 가중치 정규화 (합이 클래스 수가 되도록)
    weights = weights / torch.sum(weights) * len(num_samples_per_class)
    
    return weights

# 모델 학습 함수
def train_epoch(model, dataloader, criterion, optimizer, device):
    """
    한 에폭 동안 모델 학습
    Args:
        model: 학습할 모델
        dataloader: 데이터 로더
        criterion: 손실 함수
        optimizer: 최적화 알고리즘
        device: 학습 장치 ('cpu' 또는 'cuda')
    Returns:
        평균 손실
    """
    model.train()
    total_loss = 0.0
    
    for batch_idx, (inputs, targets) in enumerate(dataloader):
        # 데이터를 지정된 장치로 이동
        inputs, targets = inputs.to(device), targets.to(device)
        
        # 순전파
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        
        # 역전파 및 최적화
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
    return total_loss / len(dataloader)

# 모델 평가 함수
def evaluate(model, dataloader, criterion, device, threshold=0.5):
    """
    모델 평가
    Args:
        model: 평가할 모델
        dataloader: 데이터 로더
        criterion: 손실 함수
        device: 평가 장치
        threshold: 분류 임계값
    Returns:
        평균 손실, 정밀도, 재현율, F1 점수
    """
    model.eval()
    total_loss = 0.0
    
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)
            
            # 순전파
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            total_loss += loss.item()
            
            # 임계값 적용하여 예측 결과 변환 (0.5 이상이면 1, 미만이면 0)
            preds = (outputs >= threshold).float()
            
            all_preds.append(preds.cpu())
            all_targets.append(targets.cpu())
    
    # 예측값과 타겟값 결합
    all_preds = torch.cat(all_preds, dim=0)
    all_targets = torch.cat(all_targets, dim=0)
    
    # 평가 지표 계산
    # 각 클래스별 TP, FP, FN 계산
    tp = torch.sum((all_preds == 1) & (all_targets == 1), dim=0).float()
    fp = torch.sum((all_preds == 1) & (all_targets == 0), dim=0).float()
    fn = torch.sum((all_preds == 0) & (all_targets == 1), dim=0).float()
    
    # 정밀도, 재현율, F1 점수 계산
    precision = tp / (tp + fp + 1e-8)
    recall = tp / (tp + fn + 1e-8)
    f1 = 2 * precision * recall / (precision + recall + 1e-8)
    
    # 매크로 평균 (모든 클래스에 대한 평균)
    macro_precision = precision.mean().item()
    macro_recall = recall.mean().item()
    macro_f1 = f1.mean().item()
    
    return total_loss / len(dataloader), macro_precision, macro_recall, macro_f1