# Electra Finetuning

### Install Library

In [2]:
!pip install datasets transformers torch

[0m

### Load Dataset

In [3]:
# 1. Hugging Face 데이터셋 로드 (sentence-transformers/all-nli, subset: triplet)
from datasets import load_dataset

# "sentence-transformers/all-nli" 데이터셋의 "triplet" 설정을 불러옵니다.
dataset = load_dataset("sentence-transformers/all-nli", "triplet")
train_data = dataset["train"]
dev_data = dataset["dev"]
test_data = dataset["test"]

In [4]:
# 데이터 예시: anchor, positive, negative 세 가지 문장이 들어 있습니다.
print("샘플 데이터 예시:\n", train_data[0], "\n")

샘플 데이터 예시:
 {'anchor': 'A person on a horse jumps over a broken down airplane.', 'positive': 'A person is outdoors, on a horse.', 'negative': 'A person is at a diner, ordering an omelette.'} 



In [5]:
train_data = train_data.select(range(10000))  # 처음 10000개만 사용 (옵션)

### Init Model & Tokenizer

In [6]:
# 2. 모델과 토크나이저 초기화 (ELECTRA 기반 모델 사용)
from transformers import AutoTokenizer, AutoModel
import torch

model_name = "google/electra-small-discriminator"  # ELECTRA Small (더 큰 모델은 "google/electra-base-discriminator" 등 사용 가능)
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

# CPU/GPU 장치 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"사용 중인 device: {device}\n")

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/54.2M [00:00<?, ?B/s]

  return self.fget.__get__(instance, owner)()


사용 중인 device: cuda



model.safetensors:   0%|          | 0.00/54.2M [00:00<?, ?B/s]

### Define Loss-func

In [16]:
# 3. 손실 함수와 옵티마이저 설정 (TripletMarginLoss 및 AdamW)
import torch.nn as nn

loss_fn = nn.TripletMarginLoss(margin=1.0)  # margin=1.0으로 Triplet Margin Loss 정의
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)  # AdamW 옵티마이저

# Train

In [18]:
# 4. 훈련 루프 정의
def train_triplet(model, dataset, epochs=1, batch_size=32, log_interval=200):
    model.train()  # 모델을 훈련 모드로 전환
    for epoch in range(epochs):
        # 각 epoch마다 데이터를 섞어서 순서 무작위화
        dataset = dataset.shuffle(seed=epoch)
        total_loss = 0.0
        for i in range(0, len(dataset), batch_size):
            batch = dataset[i : i+batch_size]
            anchors = batch["anchor"]
            positives = batch["positive"]
            negatives = batch["negative"]
            
            # 입력 문장들을 토크나이징하여 텐서로 변환 (padding 및 truncation 적용)
            enc_anchors = tokenizer(anchors, return_tensors="pt", padding=True, truncation=True)
            enc_pos = tokenizer(positives, return_tensors="pt", padding=True, truncation=True)
            enc_neg = tokenizer(negatives, return_tensors="pt", padding=True, truncation=True)
            
            # device(cpu/gpu)로 tensors 이동
            enc_anchors = {key: val.to(device) for key, val in enc_anchors.items()}
            enc_pos = {key: val.to(device) for key, val in enc_pos.items()}
            enc_neg = {key: val.to(device) for key, val in enc_neg.items()}
            
            # 모델 forward 수행하여 각 입력의 hidden state 얻기
            out_anchor = model(**enc_anchors)
            out_positive = model(**enc_pos)
            out_negative = model(**enc_neg)
            
            # 문장 임베딩으로 [CLS] 토큰의 hidden state를 사용 (Electra는 pooler 없음)
            anchor_emb = out_anchor.last_hidden_state[:, 0, :]
            positive_emb = out_positive.last_hidden_state[:, 0, :]
            negative_emb = out_negative.last_hidden_state[:, 0, :]
            
            # Triplet margin 손실 계산
            loss = loss_fn(anchor_emb, positive_emb, negative_emb)
            
            # 역전파 및 모델 업데이트
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            # 일정 간격으로 진행 상황 출력
            if (i // batch_size) % log_interval == 0:
                print(f"Epoch {epoch+1}, Step {i//batch_size}, Loss: {loss.item():.4f}")
        avg_loss = total_loss / (len(dataset) // batch_size + 1)
        print(f"--> Epoch {epoch+1} 완료 (평균 손실: {avg_loss:.4f})\n")

    # ---- 학습 완료 후 모델 가중치 저장 ----
    # 학습된 모델의 state_dict()만 저장합니다.
    model_weights_path = "triplet_model_weights.pt"
    torch.save(model.state_dict(), model_weights_path)
    print(f"모델의 가중치를 '{model_weights_path}' 파일로 저장하였습니다.")
        

In [19]:
train_triplet(model, train_data, epochs=5, batch_size=32)

Epoch 1, Step 0, Loss: 0.1649
Epoch 1, Step 200, Loss: 0.2069
--> Epoch 1 완료 (평균 손실: 0.1655)

Epoch 2, Step 0, Loss: 0.0342
Epoch 2, Step 200, Loss: 0.1028
--> Epoch 2 완료 (평균 손실: 0.0938)

Epoch 3, Step 0, Loss: 0.0000
Epoch 3, Step 200, Loss: 0.0194
--> Epoch 3 완료 (평균 손실: 0.0626)

Epoch 4, Step 0, Loss: 0.0787
Epoch 4, Step 200, Loss: 0.1346
--> Epoch 4 완료 (평균 손실: 0.0420)

Epoch 5, Step 0, Loss: 0.0550
Epoch 5, Step 200, Loss: 0.0931
--> Epoch 5 완료 (평균 손실: 0.0329)

모델의 가중치를 'triplet_model_weights.pt' 파일로 저장하였습니다.


### Inference (Simple Inference)

In [21]:
# ===== 추론을 위한 모델 불러오기 =====
# 같은 구조의 모델을 사전학습 가중치 기반으로 초기화합니다.
inference_model = AutoModel.from_pretrained(model_name)
# 저장된 학습 가중치를 불러와서 모델에 적용합니다.
inference_model.load_state_dict(torch.load("triplet_model_weights.pt"))
# 모델을 장치(device)로 이동하고 평가 모드로 전환합니다.
inference_model.to(device)
inference_model.eval()
print("저장된 모델 가중치를 성공적으로 불러왔습니다.")

저장된 모델 가중치를 성공적으로 불러왔습니다.


In [22]:
# 추론(inference) 함수 정의
def infer_sentence(sentence: str):
    """단일 문장에 대한 추론을 수행하고, 임베딩 벡터를 반환합니다."""
    # 입력 문장을 토크나이즈하여 텐서로 변환
    encoding = tokenizer(sentence, return_tensors='pt', truncation=True, padding=True)
    encoding = {key: val.to(device) for key, val in encoding.items()}
    # 추론 단계에서는 gradient 계산 불필요(torch.no_grad() 사용)
    with torch.no_grad():
        outputs = inference_model(**encoding)
        # CLS 토큰의 은닉 상태를 임베딩으로 추출
        sentence_emb = outputs.last_hidden_state[:, 0, :].cpu().numpy()
    return sentence_emb

# 추론 예시
test_sentence = "이것은 테스트 문장입니다."
result_vector = infer_sentence(test_sentence)
print(f"입력 문장: '{test_sentence}'")
print(f"추론 결과 임베딩 벡터 크기: {result_vector.shape}")

입력 문장: '이것은 테스트 문장입니다.'
추론 결과 임베딩 벡터 크기: (1, 256)


### Inference (Most Similar)

In [13]:
import torch.nn.functional as F

def find_most_similar(model, anchor_text, candidate_texts):
    model.eval()  # 평가 모드 (dropout 비활성화 등)
    # 토크나이즈 및 텐서 변환
    enc_anchor = tokenizer(anchor_text, return_tensors="pt", padding=True, truncation=True).to(device)
    enc_candidates = tokenizer(candidate_texts, return_tensors="pt", padding=True, truncation=True).to(device)
    
    # Gradient 계산 비활성화 (inference)
    with torch.no_grad():
        anchor_output = model(**enc_anchor)
        cand_output = model(**enc_candidates)
    # 임베딩 추출 ([CLS] 토큰의 hidden state 사용)
    anchor_emb = anchor_output.last_hidden_state[:, 0, :]
    cand_emb = cand_output.last_hidden_state[:, 0, :]
    # 코사인 유사도 계산
    anchor_emb = F.normalize(anchor_emb, p=2, dim=1)  # 정규화 (크기 1의 벡터)
    cand_emb = F.normalize(cand_emb, p=2, dim=1)
    similarities = torch.matmul(cand_emb, anchor_emb.T).squeeze(1)  # 각 후보와 앵커의 코사인 유사도
    best_idx = int(torch.argmax(similarities))  # 가장 높은 유사도의 인덱스
    best_sentence = candidate_texts[best_idx]
    return best_idx, best_sentence, similarities.cpu().tolist()

In [14]:
# 앵커와 후보 문장을 설정합니다. (여기서는 테스트 세트의 첫 번째 예시를 사용합니다.)
anchor_example = test_data[0]["anchor"]
positive_example = test_data[0]["positive"]
negative_example = test_data[0]["negative"]
other_example = test_data[1]["negative"]  # 다른 무작위 문장 (negative 예시로 사용)

candidate_sentences = [positive_example, negative_example, other_example]

print("Anchor 문장:", anchor_example)
print("후보 문장들:", candidate_sentences)

Anchor 문장: This church choir sings to the masses as they sing joyous songs from the book at a church.
후보 문장들: ['The church is filled with song.', 'A choir singing at a baseball game.', 'The woman has been shot.']


In [15]:
# 가장 유사한 문장 찾기
best_idx, best_sentence, sims = find_most_similar(model, anchor_example, candidate_sentences)

# 결과 출력
for idx, (cand, sim) in enumerate(zip(candidate_sentences, sims)):
    print(f"후보[{idx}] 유사도: {sim:.4f} | 문장: {cand}")
print(f"=> 가장 유사한 문장 (예측 Positive): 후보[{best_idx}] \"{best_sentence}\"")

후보[0] 유사도: 0.8633 | 문장: The church is filled with song.
후보[1] 유사도: 0.8732 | 문장: A choir singing at a baseball game.
후보[2] 유사도: 0.9052 | 문장: The woman has been shot.
=> 가장 유사한 문장 (예측 Positive): 후보[2] "The woman has been shot."
