In [8]:
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import LoraConfig, TaskType, get_peft_model

#############################################
# 1. 두 LoRA weight 행렬(W⁺, W⁻) 결합 함수 정의
#############################################
def combine_lora_weights(W_plus: np.ndarray, W_minus: np.ndarray, threshold: float = 0.9) -> np.ndarray:
    """
    두 LoRA weight 행렬 W_plus와 W_minus를 결합하여 새로운 weight W_new를 계산합니다.
    (W_new = W_plus + (W_minus - P W_minus), P는 두 행렬의 공통 subspace에 대한 projection)
    
    Args:
        W_plus (np.ndarray): alpaca-gpt4로 파인튜닝한 모델의 LoRA weight (W⁺)
        W_minus (np.ndarray): toxic 데이터셋으로 파인튜닝한 모델의 LoRA weight (W⁻)
        threshold (float): 공통 subspace를 선택할 singular value threshold
        
    Returns:
        np.ndarray: 결합된 새로운 weight W_new.
    """
    r = 4  # LoRA의 rank (두 모델 모두 r=4로 가정)
    lamb = 1.0
    # W_plus에 대한 SVD: W_plus = U_plus * S_plus * Vh_plus
    U_plus, S_plus, Vh_plus = np.linalg.svd(W_plus, full_matrices=False)
    U_plus = U_plus[:, :r]  # (out_features x r)
    
    # W_minus에 대한 SVD: W_minus = U_minus * S_minus * Vh_minus
    U_minus, S_minus, Vh_minus = np.linalg.svd(W_minus, full_matrices=False)
    U_minus = U_minus[:, :r]  # (out_features x r)
    
    # U_plus와 U_minus의 공통 subspace 찾기:
    X = np.dot(U_plus.T, U_minus)   # (r x r)
    U_m, singular_values, Vh_m = np.linalg.svd(X)
    
    # singular value가 threshold 이상인 방향 선택
    common_indices = np.where(singular_values >= threshold)[0]
    if common_indices.size == 0:
        print("Threshold 이상의 공통 subspace가 없습니다. 모든 r 방향을 사용합니다.")
        common_indices = np.arange(r)
        
    # U_common: U_plus의 선형 조합으로 공통 basis 구하기
    U_common = np.dot(U_plus, U_m[:, common_indices])  # (out_features x k), k <= r
    P = np.dot(U_common, U_common.T)  # Projection matrix onto common subspace
    
    # W_common: W_minus의 공통 성분
    W_common = np.dot(P, W_minus)
    
    # 최종적으로 결합한 weight
    W_new = W_plus - lamb * (W_minus - lamb * W_common)
    return W_new

#############################################
# 2. W_new를 LoRA의 두 행렬(lora_B, lora_A)로 분해하는 함수
#############################################
def factorize_weight(W_new: np.ndarray, r: int, scaling: float):
    """
    W_new(효과적인 LoRA 업데이트)를 lora_B와 lora_A로 분해합니다.
    LoRA 업데이트는 원래 (lora_B @ lora_A) * scaling 형태로 적용되므로,
    lora_B @ lora_A = W_new / scaling가 되어야 합니다.
    
    SVD를 통해 M = W_new/scaling = U S V^T 로 분해한 후,
    lora_B = U * sqrt(S)   (shape: [out_features, r])
    lora_A = sqrt(S) * V^T   (shape: [r, in_features])
    
    Args:
        W_new (np.ndarray): 결합된 effective weight update (out_features x in_features)
        r (int): LoRA의 rank (예제에서는 4)
        scaling (float): lora_alpha / r (예: 32/4 = 8)
        
    Returns:
        lora_B, lora_A: torch.Tensor로 변환된 분해 결과.
    """
    M = W_new / scaling  # (lora_B @ lora_A = M)
    U, S, Vh = np.linalg.svd(M, full_matrices=False)
    
    U_r = U[:, :r]      # (out_features x r)
    S_r = S[:r]         # (r,)
    Vh_r = Vh[:r, :]    # (r x in_features)
    
    sqrt_S = np.sqrt(S_r)
    lora_B = U_r * sqrt_S[np.newaxis, :]   # broadcasting, shape: (out_features x r)
    lora_A = sqrt_S[:, np.newaxis] * Vh_r    # shape: (r x in_features)
    
    # torch tensor로 변환 (dtype은 모델과 일치하도록)
    lora_B = torch.tensor(lora_B, dtype=torch.float16)
    lora_A = torch.tensor(lora_A, dtype=torch.float16)
    return lora_B, lora_A


In [9]:

#############################################
# 3. Qwen 모델 로드 및 LoRA 적용 (PEFT 방식)
#############################################
model_name = "Qwen/Qwen2.5-0.5B"
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    trust_remote_code=True,
    torch_dtype=torch.float16,
    device_map="auto"
)

# LoRA 설정 (원래 파인튜닝에 사용했던 target module 목록)
lora_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=4,
    lora_alpha=32,
    lora_dropout=0.1,
    target_modules=[
        "self_attn.q_proj",
        "self_attn.k_proj", 
        "self_attn.v_proj",
        "self_attn.o_proj",
        "mlp.gate_proj",
        "mlp.up_proj", 
        "mlp.down_proj"
    ]
)

# PEFT를 통해 모델에 LoRA 어댑터 추가
peft_model = get_peft_model(model, lora_config)


In [10]:
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel, LoraConfig, get_peft_model

# 🟢 Fine-tuned LoRA 모델 경로
path_plus = "./qwen-0.5b-lora-finetuned-alpaca-gpt4"   # W+
path_minus = "./qwen-0.5b-lora-finetuned-toxic"         # W-

# 원본 Qwen 모델 로드
model_name = "Qwen/Qwen2.5-0.5B"
base_model = AutoModelForCausalLM.from_pretrained(
    model_name, trust_remote_code=True, torch_dtype=torch.float16, device_map="auto"
)

# Fine-tuned LoRA 모델 불러오기
model_plus = PeftModel.from_pretrained(base_model, path_plus)
model_minus = PeftModel.from_pretrained(base_model, path_minus)

# 모델의 state_dict 가져오기
state_dict_plus = model_plus.state_dict()
state_dict_minus = model_minus.state_dict()

# 🟢 LoRA Target Modules
target_modules = [
    "self_attn.q_proj",
    "self_attn.k_proj", 
    "self_attn.v_proj",
    "self_attn.o_proj",
    "mlp.gate_proj",
    "mlp.up_proj", 
    "mlp.down_proj"
]

# LoRA 설정 (새로운 모델에 적용)
lora_config = LoraConfig(
    task_type="CAUSAL_LM",
    r=4,
    lora_alpha=32,
    lora_dropout=0.1,
    target_modules=target_modules
)

# 새로운 PEFT 모델 생성
new_peft_model = get_peft_model(base_model, lora_config)

# LoRA scaling factor
scaling = lora_config.lora_alpha / lora_config.r  # 예: 32/4 = 8

# 🟢 LoRA weight를 결합하여 새로운 W_new 생성
for layer_idx in range(24):  # Qwen-0.5B는 24개의 Transformer layer를 가짐
    for target_module in target_modules:
        # LoRA weight 키 생성 (각 레이어에 대해)
        key_A = f"base_model.model.model.layers.{layer_idx}.{target_module}.lora_A.default.weight"
        key_B = f"base_model.model.model.layers.{layer_idx}.{target_module}.lora_B.default.weight"

        # 키가 존재하는 경우에만 업데이트 수행
        if key_A in state_dict_plus and key_B in state_dict_plus:
            # 기존 W+와 W- 불러오기 (torch.Tensor → numpy 변환)
            W_plus = (state_dict_plus[key_B] @ state_dict_plus[key_A]).cpu().numpy()
            W_minus = (state_dict_minus[key_B] @ state_dict_minus[key_A]).cpu().numpy()

            # W_new 생성
            W_new = combine_lora_weights(W_plus, W_minus)

            # W_new를 SVD 분해하여 lora_A, lora_B로 복구
            lora_B, lora_A = factorize_weight(W_new, r=lora_config.r, scaling=scaling)

            # 새로운 모델의 LoRA weight 업데이트 (torch.Tensor 형태로 변환)
            with torch.no_grad():
                state_dict_plus[key_A].copy_(lora_A.to(state_dict_plus[key_A].dtype))
                state_dict_plus[key_B].copy_(lora_B.to(state_dict_plus[key_B].dtype))

print("새로운 LoRA 모델이 정상적으로 적용되었습니다!")

# 🟢 새로운 LoRA 모델 저장
save_path = "./qwen-0.5b-unlearned-lora"
new_peft_model.save_pretrained(save_path)
print(f"새로운 결합된 LoRA 모델이 저장되었습니다: {save_path}")


새로운 LoRA 모델이 정상적으로 적용되었습니다!
새로운 결합된 LoRA 모델이 저장되었습니다: ./qwen-0.5b-unlearned-lora
