In [None]:
import torch
from torch import nn
from torch.nn import functional as F
import numpy as np
import torch
from torch import Tensor
from torchmetrics import Metric
import torchmetrics


In [None]:
class EarlyStop(): # 모델 학습 중 성능이 더 이상 개선되지 않을 때 학습을 중단
    def __init__(self, max_patience, maximize=False):
        self.maximize=maximize
        self.max_patience = max_patience
        self.best_loss = None # 현재까지 학습중 최적의 loss. 초기에는 선언만 함
        self.patience = max_patience + 0
    def __call__(self, loss):
        if self.best_loss is None: 
            self.best_loss = loss
            self.patience = self.max_patience + 0
        elif loss < self.best_loss:
            self.best_loss = loss
            self.patience = self.max_patience + 0 # 제일 좋은걸로 업데이트, patience 초기화
        else:
            self.patience -= 1 # 점점 인내심이 달아 없어진다..
        return not bool(self.patience)

In [None]:
class GroupwiseMetric(Metric): 
    def __init__(self, metric,
                 grouping = "cell_lines",
                 average = "macro",
                 nan_ignore=False,
                 alpha=0.00001,
                 residualize = False,
                 **kwargs):
        super().__init__(**kwargs)
        self.grouping = grouping
        self.metric = metric
        self.average = average
        self.nan_ignore = nan_ignore
        self.residualize = residualize
        self.alpha = alpha
        self.add_state("target", default=torch.tensor([]))
        self.add_state("pred", default=torch.tensor([]))
        self.add_state("drugs", default=torch.tensor([]))
        self.add_state("cell_lines", default=torch.tensor([]))
        
    def get_residual(self, X, y):
        w = self.get_linear_weights(X, y)
        r = y-(X@w)
        return r
    
    def get_linear_weights(self, X, y):
        A = X.T@X
        Xy = X.T@y
        n_features = X.size(1)
        A.flatten()[:: n_features + 1] += self.alpha
        return torch.linalg.solve(A, Xy).T
    
    def get_residual_ind(self, y, drug_id, cell_id, alpha=0.001):
        X = torch.cat([y.new_ones(y.size(0), 1),
                       torch.nn.functional.one_hot(drug_id),
                       torch.nn.functional.one_hot(cell_id)], 1).float()
        return self.get_residual(X, y)

    def compute(self) -> Tensor:
        if self.grouping == "cell_lines":
            grouping = self.cell_lines
        elif self.grouping == "drugs":
            grouping = self.drugs
        metric = self.metric
        
        if not self.residualize:
            y_obs = self.target
            y_pred = self.pred
        else:
            y_obs = self.get_residual_ind(self.target, self.drugs, self.cell_lines)
            y_pred = self.get_residual_ind(self.pred, self.drugs, self.cell_lines)
        average = self.average
        nan_ignore = self.nan_ignore
        unique = grouping.unique()
        proportions = []
        metrics = []
        for g in unique:
            is_group = grouping == g
            metrics += [metric(y_obs[grouping == g], y_pred[grouping == g])]
            proportions += [is_group.sum()/len(is_group)]
        if average is None:
            return torch.stack(metrics)
        if (average == "macro") & (nan_ignore):
            return torch.nanmean(y_pred.new_tensor([metrics]))
        if (average == "macro") & (not nan_ignore):
            return torch.mean(y_pred.new_tensor([metrics]))
        if (average == "micro") & (not nan_ignore):
            return (y_pred.new_tensor([proportions])*y_pred.new_tensor([metrics])).sum()
        else:
            raise NotImplementedError
    
    def update(self, preds: Tensor, target: Tensor,  drugs: Tensor,  cell_lines: Tensor) -> None:
        self.target = torch.cat([self.target, target])
        self.pred = torch.cat([self.pred, preds])
        self.drugs = torch.cat([self.drugs, drugs]).long()
        self.cell_lines = torch.cat([self.cell_lines, cell_lines]).long()

- **GroupwiseMetric(Metric)**: 모델 평가에서 특정 그룹 단위로 성능 지표를 계산하고, 필요 시 residual을 계산하는 기능. PyTorch의 metric 클래스를 상속한다.
- **\_\_init\_\_**: 
  - `Metric` 클래스는 기본적으로 배치 단위로 계산된 값들을 계속해서 누적하여 최종 메트릭을 계산하는 기능을 제공한다. 예를들어 전체 데이터셋의 평균 손실 등을 개별 배치의 결과를 합산하거나 평균하여 얻어진다. 이러한 데이터 누적과 계산을 위해 `Metric` 클래스는 state 변수를 사용할 수 있다. state 변수는 `Metric` 클래스의 `add_state` 메서드를 통해 정의되며, 이를 통해 메트릭 계산에 필요한 상태 변수를 정의하고 초기화할 수 있다.
  - 정의된 state 변수들은 `Metric` 클래스의 메서드들(`update`, `compute`, `reset`)에 의해 자동으로 관리된다. update 메서드는 새로운 데이터를 받아 state 변수에 추가하고, compute 메서드는 state 변수를 이용하여 최종 메트릭을 계산한다. reset 메서드는 state 변수를 초기화한다.
  - state를 사용함으로써 데이터의 누적과 관리를 자동으로 할 수 있으므로, 편리하다. 그러나 state 변수를 사용하면 메모리 사용량이 증가할 수 있으므로 주의해야 한다.
  - 여기서는 새로운 state 변수 target, pred, drugs, cell_lines를 정의하고 `torch.tensor([])` 형태로 초기화한다. 각 state 변수에 데이터를 축적할 수 있다. 
- **get_linear_weights(self, X, y)**: 
  - linear regression, 그 중 regularized least squares를 통해 weight vector를 계산하며, 정규화 항 `self.alpha`를 사용한다. 다음 방정식을 사용한다.
    $$w= (X^TX)^{-1}X^Ty$$
  - 정규화 항을 추가하는게 조금 복잡한데, 다음을 따른다. 
    - `A.flatten()`은 `A` 행렬을 1차원으로 펼친다.
    * `[:: n_features + 1]`은 대각선 요소만 선택하는 슬라이싱이다. 예를 들어, `n_features=3`이라면 `flatten()[::4]`를 통해 인덱스가 0, 4, 8인 요소(대각 요소)를 선택한다.
    * `+= self.alpha`를 통해 대각 요소에 정규화 항 alpha를 더한다.
  * `torch.linalg.solve(A, Xy).T`는 선형 방정식을 풀어 최종 가중치 벡터 `w`를 반환한다. 
* **get_residual_ind(self, y, drug_id, cell_id, alpha=0.001)**:
  * 주어진 타겟값 `y`, `drug_id`, `cell_id`를 사용하여 linear regression 기반의 residual을 계산한다. 
  * 원핫 인코딩을 통해 `id`값들을 벡터로 변환. 
  * `torch.cat`은 상수항 벡터, 약물 원핫 인코딩 벡터, 세포주 원핫 인코딩 벡터를 col방향으로 결합한다. 
  * `get_residual`을 호출하여 생성한 `X`와 실제 타겟 값 `y`를 기반으로 residual을 계산한다. 
* **compute(self) -> Tensor**:
  * `->`는 파이썬의 type hint이다. 함수의 반환값이 어떤 자료형인지 알려주는 표시이다. 즉 여기서 이 `compute`는 그룹별로 메트릭을 계산하고, 그 결과를 결합하여 최종 성능을 `Tensor`형태의 결과로 출력한다.
  * grouping에 따라, 메트릭 계산을 달리한다. 
  * `self.residualize`가 `True`면 residual을 사용해 메트릭을 계산하고, 아니면 원본 타겟 값과 예측 값을 그대로 사용하여 계산한 후 obs와 pred에 저장한다. 
  * for loop에서는, 각 그룹별로 성능지표를 계산한다. `proportions` 리스트에는 해당 그룹의 샘플 비율을 저장하여, 이후 `micro` 평균 방식에서 사용할 수 있도록 한다.
  * 최종 메트릭 계산
    * `average`가 `None`일 경우, 그룹별 메트릭들을 그대로 반환한다.
    * `macro`일 경우, `nan_ignore`가 True이면 `NaN`값을 무시하고 평균을 계산하고, `False`이면 포함해서 평균을 계산한다.
    * `micro`일 경우, `proportions`를 이용해 각 그룹의 비율을 반영한 가중 평균을 계산한다. 
* **update()**:
  * 매 배치마다 모델의 `preds`, `target`, `drugs`, `cell_lines` 정보를 받아 state 변수에 누적하는 역할을 한다. 누적된 데이터는 이후 `compute`메서드에서 전체 데이터에 대한 메트릭을 계산하는데 사용된다.