In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from typing import Optional, List

In [2]:
class LoRALayer():
    def __init__(
            self,
            r: int,
            lora_alpha: int,
            lora_dropout: float,
            merge_weights: bool,
    ):
        self.r = r #Low-rank 차원 설정
        self.lora_alpha = lora_alpha #LoRA에서 scaling factor
        # Optional dropout: 드롭아웃을 사용하는 경우
        if lora_dropout > 0.:
            self.lora_dropout = nn.Dropout(p=lora_dropout)
        else:
            self.lora_dropout = lambda x: x #드랍아웃이 0이면 그대로 반환
        self.merged = False #가중치 병합 여부 초기화
        self.merge_weights = merge_weights #가중치를 병합할지 여부

### 1. 가중치 병합 여부 초기화

`self.merged = False`는 **가중치가 병합되었는지 여부를 추적**하기 위해 사용됩니다. LoRA에서는 기본적으로 사전 학습된 모델의 가중치를 그대로 유지하면서, low-rank 가중치인 `lora_A`와 `lora_B` 행렬을 추가로 학습합니다.

LoRA가 적용된 모델은 두 가지 상태를 가질 수 있습니다:
- **병합되지 않은 상태 (merged = False)**: 이 상태에서는 사전 학습된 가중치와 LoRA 가중치(`lora_A`, `lora_B`)가 각각 분리되어 있습니다. 학습 중일 때는 이 상태에서 LoRA 가중치만 업데이트됩니다.
- **병합된 상태 (merged = True)**: 이 상태에서는 LoRA 가중치가 사전 학습된 가중치와 병합되어 하나의 가중치 행렬로 사용됩니다. 병합은 모델을 추론(inference) 단계에서 사용할 때 주로 일어나며, 병합된 가중치로 빠르게 계산할 수 있습니다.

즉, `self.merged`는 현재 모델이 **LoRA 가중치를 사전 학습된 가중치와 병합한 상태인지 아닌지**를 나타내며, 이 값에 따라 병합된 상태로 추론할지, 아니면 병합되지 않은 상태로 학습을 계속할지를 결정하게 됩니다.

### 2. 가중치를 병합할지 여부

여기서 **가중치를 병합할지 여부**는 `merge_weights` 변수를 의미합니다. 병합이라는 개념은 **LoRA 가중치(추가된 A, B 행렬)**와 **사전 학습된 가중치**를 **하나의 가중치로 합쳐서 계산**할지를 결정하는 것과 관련이 있습니다.

자세히 설명하면:

- **이전 학습된 모델의 가중치**는 LoRA가 적용되기 전에 사전 학습된 가중치(weight)를 의미합니다. 이 가중치들은 일반적으로 freeze(고정)되어 있고, 학습 중에 업데이트되지 않습니다.
  
- **LoRA 가중치**는 추가로 학습하는 **저차원(low-rank)** 가중치들입니다. 이는 `lora_A`와 `lora_B`로 나누어져 있으며, 학습 중에 이 두 행렬만 업데이트됩니다.

LoRA의 핵심 아이디어는 **A와 B 행렬을 곱한 값**을 기존 사전 학습된 가중치에 더해 모델이 새로운 상황에 적응할 수 있도록 하는 것입니다.

#### 병합의 의미:
- **병합되지 않은 상태**: 기존 사전 학습된 가중치와 LoRA 가중치를 **별도로 유지**하며, 계산 시마다 두 개의 가중치를 이용해 결과를 생성합니다. 즉, 추론 시마다 기존 가중치와 LoRA 가중치를 모두 고려해서 계산하게 됩니다.
  
- **병합된 상태**: LoRA 가중치 (`lora_A`, `lora_B`)를 **기존 사전 학습된 가중치에 미리 더해서 하나의 통합된 가중치 행렬**을 만들고, 추론 시에는 **그 통합된 가중치만** 사용해 계산을 더 빠르게 할 수 있습니다. 이 과정에서 LoRA 가중치가 **기존 가중치와 "병합"**되는 것입니다.

따라서 **가중치를 병합한다는 것**은:
- LoRA의 A, B 행렬을 **기존 사전 학습된 가중치**에 더해서 **하나의 새로운 가중치 행렬로 만들어** 사용하는 것을 의미합니다.
- 추론 단계에서 속도를 높이기 위해 병합된 가중치를 사용할 수 있으며, 병합된 후에는 더 이상 LoRA의 A, B 행렬을 별도로 계산할 필요가 없어집니다.

### 요약

- **가중치 병합 여부 초기화**는 LoRA 가중치와 사전 학습된 가중치가 병합되었는지 여부를 추적하는 플래그입니다.
- **가중치를 병합할지 여부**는 LoRA의 A, B 행렬을 기존 사전 학습된 가중치에 더해서 하나의 가중치로 만들지, 아니면 그대로 별도로 계산할지를 결정합니다.

In [5]:
#LoRA가 적용된 Embedding 레이어 정의
class Embedding(nn.Embedding, LoRALayer):
    def __init__(
            self,
            num_embeddings: int, #임베딩 모델이 처리할 수 있는 전체 토큰의 수
            embedding_dim: int, #임베딩 차원 수
            r: int = 0, #Low-rank 차원
            lora_alpha: int = 1, #LoRA 스케일리 계수
            merge_weights: bool = True, #가중치 병합 여부
            **kwargs
    ):
        nn.Embedding.__init__(self, num_embeddings, embedding_dim, **kwargs) # nn.Embedding 초기화
        LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=0, merge_weights=merge_weights) #LoRA 레이어 초기화
        if r > 0: #Low-rank 차원이 0보다 큰 경우, 학습 가능한 파라미터 설정
            self.lora_A = nn.Parameter(self.weight.new_zeros((r, num_embeddings))) # A 행렬 (r x num_embeddings)
            self.lora_B = nn.Parameter(self.weight.new_zeros((embedding_dim, r))) # B 행렬 (embedding_dim x r)
            self.scaling = self.lora_alpha / self.r
            self.weight.requires_grad = False # 사전 학습된 weights는 고정

        self.reset_parameters() #파라미터 초기화

    def reset_parameters(self):
        nn.Embedding.reset_parameters(self)
        if hasattr(self, 'lora_A'):
            # A는 0으로, B는 정규분포로 초기화
            nn.init.zeros_(self.lora_A)
            nn.init.normal_(self.lora_B)
    
    #train 함수는 학습 모드와 추론 모드를 설정하고, LoRA 가중치 병합 상태를 관리
    def train(self, mode: bool = True):
        nn.Embedding.train(self, mode)
        if mode:
            if self.merge_weights and self.merged:
                #mode=True에서 병합 해제는 학습 시 LoRA의 A, B 행렬을 업데이트할 수 있도록 하기 위함
                if self.r > 0 :
                    self.weight.data -= (self.lora_B @ self.lora_A).transpose(0,1) * self.scaling
                self.merged = False
        else:
            if self.merge_weights and not self.merged:
                # mode=False에서 병합은 평가 시 A, B 행렬과 기존 가중치를 하나로 합쳐 빠른 추론을 하기 위함
                if self.r > 0:
                    self.weight.data += (self.lora_B @ self.lora_A).transpose(0,1) * self.scaling
                self.merged = True # 병합 상태로 변경
        #r > 0일 때만 해제/병합을 하는 이유는 A, B 행렬이 존재할 때만 그 가중치를 조정할 필요가 있기 때문

    def forward(self, x: torch.Tensor):
        if self.r > 0 and not self.merged:
            result = nn.Embedding.forward(self, x)
            #lora_A는 각 토큰에 대해 저차원 임베딩 벡터를 학습하는 역할을 하고, x에 해당하는 토큰 인덱스를 기반으로 lora_A에서 임베딩 벡터를 추출
            after_A = F.embedding(
                x, #입력 텐서
                self.lora_A.transpose(0,1), #임베딩으로 사용할 가중치 행렬
                self.padding_idx, #패딩 인덱스로 -1인 경우 해당 인덱스는 무시됨
                self.max_norm, #해당 값 이하로 정규화 됨. 임베딩 벡터의 크기를 제한하기 위해 사용
                self.norm_type, #정규화시 사용할 L-norm의 유형을 설정. 2일 경우 L2 사용됨
                self.scale_grad_by_freq, 
                #입력 텐서의 빈도에 따라 그래디언트가 조정될지 여부 설정. 
                # True 일경우 자주 등장하는 단어에 대한 그래디언트는 줄어들고, 드물게 등장하는 단어에 대한 그래디언트는 증가
                self.sparse #희소 행렬(sparse matrix)을 사용해 그래디언트를 계산할지 여부를 설정
            ) #LoRA A 행렬 적용
            result += (after_A @ self.lora_B.transpose(0,1)) @ self.scaling
            return result
        
        else:
            return nn.Embedding.forward(self,x)

`lora_B`를 정규분포로 초기화하는 이유는 **학습의 안정성과 효율성**을 고려한 것입니다. 정규분포로 초기화하는 것은 일반적으로 신경망의 가중치를 초기화할 때 흔히 사용되는 방법 중 하나로, 몇 가지 이유가 있습니다:

### 1. **학습의 안정성**
   - 가중치를 무작위로 초기화하면 학습이 불안정해질 수 있습니다. 너무 큰 값이나 너무 작은 값으로 초기화할 경우, 신경망에서 **기울기 소실(vanishing gradients)** 또는 **기울기 폭발(exploding gradients)** 문제가 발생할 수 있습니다.
   - 정규분포를 사용하면 가중치 값들이 특정 범위 내에서 적절히 분포되기 때문에, 초기 학습 단계에서 모델의 출력과 기울기 값이 과도하게 커지거나 작아지는 문제를 줄일 수 있습니다.

### 2. **랜덤 초기화의 문제**
   - **완전히 랜덤하게 초기화**하면, 가중치 값들이 너무 극단적인 값(예: 매우 큰 값, 매우 작은 값)을 가질 가능성이 높아집니다. 이는 학습이 시작될 때 비효율적일 수 있습니다.
   - 정규분포를 사용하면 평균이 0인 값들로 초기화되므로, 가중치의 초기 값들이 어느 정도 균형을 이루게 됩니다. 이는 모델이 학습을 시작할 때 **일관된 기울기 흐름**을 유지할 수 있게 도와줍니다.

### 3. **LoRA의 특성**
   - LoRA는 low-rank 행렬인 `lora_A`와 `lora_B`를 추가적으로 학습하는 방식입니다. 이때 `lora_B`를 정규분포로 초기화하면, 초기 학습 단계에서 **적절한 변화를 유도**할 수 있습니다. 
   - 즉, 정규분포 초기화는 초기 단계에서 큰 변동 없이 **작은 변화**를 유도하는 데 적합합니다. 만약 가중치가 너무 무작위로 설정된다면, 학습이 불안정해지거나 더 많은 학습 시간이 필요할 수 있습니다.

### 4. **정규분포를 선택하는 이유**
   - 신경망의 가중치 초기화 방식은 종종 **He 초기화** 또는 **Xavier 초기화**처럼 잘 알려진 방식들이 사용되며, 그 중 많은 경우 정규분포나 균등분포를 사용합니다. 이는 수학적으로 검증된 방법으로, 학습의 성능을 개선하고 안정성을 보장하는 것으로 알려져 있습니다.
   - `lora_B`를 정규분포로 초기화하면 이러한 이점들을 가져갈 수 있습니다.

In [3]:
#F.embedding 추가 설명
import torch
import torch.nn.functional as F

# 가중치 행렬 (num_embeddings=5, embedding_dim=3)
weight = torch.tensor([[1, 0, 0], 
                       [0, 1, 0], 
                       [0, 0, 1],
                       [1, 1, 1],
                       [0.5, 0.5, 0.5]])

# 입력 인덱스
input = torch.tensor([0, 2, 3])

# 임베딩 계산
output = F.embedding(input, weight)

print(output)


tensor([[1., 0., 0.],
        [0., 0., 1.],
        [1., 1., 1.]])


위 코드를 보면:

가중치 행렬은 (5, 3) 크기입니다. 즉, 5개의 벡터가 있고, 각 벡터는 3차원입니다.
입력 인덱스는 [0, 2, 3]입니다. 이는 각각 가중치 행렬의 0번, 2번, 3번 행을 참조하라는 의미입니다.
결과적으로 F.embedding 함수는 해당 인덱스들에 대응하는 가중치 행렬의 행을 추출하여 다음과 같은 임베딩 벡터를 반환합니다:

tensor([[1, 0, 0],
        [0, 0, 1],
        [1, 1, 1]])

즉, 입력 인덱스 [0, 2, 3]에 대응하는 임베딩 벡터들이 가중치 행렬의 0번, 2번, 3번 행에서 추출된 것입니다.

**요약:**

F.embedding 함수는 입력된 인덱스 텐서를 가중치 행렬에서 찾아 임베딩 벡터를 반환하는 역할을 합니다.
인덱스에 대응하는 벡터를 추출하는 것이지, 가중치 행렬과 입력을 곱하는 연산은 아닙니다.
따라서 인덱스가 주어지면, 가중치 행렬에서 해당 인덱스에 대응하는 벡터를 바로 반환하는 방식입니다.

In [None]:
# LoRA가 적용된 Linear 레이어 정의
class Linear(nn.Linear, LoRALayer):
    #LoRA를 Linear 레이어에 적용
    def __init__(
        self, 
        in_features: int, # 입력 피처 수
        out_features: int, # 출력 피처 수
        r: int = 0, # Low-rank 차원
        lora_alpha: int = 1, # LoRA 스케일링 계수
        lora_dropout: float = 0., # 드롭아웃 확률
        fan_in_fan_out: bool = False, # (fan_in, fan_out) 구조 여부
        merge_weights: bool = True, # 가중치 병합 여부
        **kwargs
    ):
        nn.Linear.__init__(self, in_features, out_features, **kwargs) # nn.Linear 초기화
        