In [1]:
!pip install tqdm

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m


In [2]:
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer, TextClassificationPipeline
from datasets import load_dataset
from sklearn.metrics import f1_score
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
from tqdm import tqdm

In [3]:
model_name = "searle-j/kote_for_easygoing_people"
teacher_model = AutoModelForSequenceClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 데이터셋 로드
dataset = load_dataset("searle-j/kote")

  return self.fget.__get__(instance, owner)()
You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


In [4]:

pipe = TextClassificationPipeline(
    model=teacher_model,
    tokenizer=tokenizer,
    device=0 if torch.cuda.is_available() else -1,  # gpu number, -1 if cpu used
    return_all_scores=True,
    function_to_apply='sigmoid'
)



In [5]:
def tokenize_and_encode(examples):
    inputs = tokenizer(examples['text'], padding='max_length', truncation=True, max_length=512)
    labels = torch.zeros((len(examples['labels']), teacher_model.config.num_labels))
    for i, label_list in enumerate(examples['labels']):
        for label in label_list:
            labels[i][label] = 1
    inputs['labels'] = labels
    return inputs

tokenized_dataset = dataset.map(tokenize_and_encode, batched=True)

In [6]:
# 커스텀 데이터셋 클래스 정의
class CustomDataset(Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        # 여기서 필요한 키만 선택하여 텐서로 변환합니다.
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
        return item

    def __len__(self):
        return len(self.encodings['input_ids'])


In [7]:
def convert_to_tensors(dataset):
    return {
        'input_ids': torch.tensor(dataset['input_ids']),
        'attention_mask': torch.tensor(dataset['attention_mask']),
        'labels': torch.tensor(dataset['labels'])
    }

In [8]:
train_dataset = CustomDataset(convert_to_tensors(tokenized_dataset['train']))
val_dataset = CustomDataset(convert_to_tensors(tokenized_dataset['validation']))
test_dataset = CustomDataset(convert_to_tensors(tokenized_dataset['test']))

# 데이터 로더 생성
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)
test_loader = DataLoader(test_dataset, batch_size=16)

In [9]:

# 학생 모델 정의 (LSTM 기반)
class LSTMStudentModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_layers):
        super(LSTMStudentModel, self).__init__()
        self.embedding = nn.Embedding(input_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_ids):
        embedded = self.embedding(input_ids)
        lstm_out, _ = self.lstm(embedded)
        lstm_out = lstm_out[:, -1, :]
        output = self.fc(lstm_out)
        return self.sigmoid(output)


In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [11]:
input_dim = tokenizer.vocab_size
hidden_dim = 256
output_dim = teacher_model.config.num_labels
n_layers = 2
student_model = LSTMStudentModel(input_dim, hidden_dim, output_dim, n_layers).to(device)

# 옵티마이저 및 손실 함수 설정
criterion = nn.BCELoss()
optimizer = optim.Adam(student_model.parameters(), lr=1e-3)

In [12]:
def distillation_loss(student_logits, teacher_logits, temperature):
    teacher_probs = nn.functional.softmax(teacher_logits / temperature, dim=-1)
    student_log_probs = nn.functional.log_softmax(student_logits / temperature, dim=-1)
    return nn.functional.kl_div(student_log_probs, teacher_probs, reduction='batchmean') * (temperature ** 2)


In [13]:
def train(model, teacher_model, dataloader, optimizer, criterion, device, temperature=2.0, alpha=0.5):
    model.train()
    teacher_model.eval()
    for batch in tqdm(dataloader, desc="Training"):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].float().to(device)
        
        with torch.no_grad():
            teacher_outputs = teacher_model(input_ids=input_ids, attention_mask=attention_mask)
            teacher_logits = teacher_outputs.logits

        student_outputs = model(input_ids)
        student_loss = criterion(student_outputs, labels)
        distill_loss = distillation_loss(student_outputs, teacher_logits, temperature)
        loss = alpha * student_loss + (1 - alpha) * distill_loss

        loss.backward()
        optimizer.step()

In [16]:
def evaluate(model, dataloader, device):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            labels = batch['labels'].float().to(device)
            outputs = model(input_ids)
            preds = (outputs > 0.3).float()
            all_preds.append(preds.cpu().numpy())
            all_labels.append(labels.cpu().numpy())
    all_preds = np.concatenate(all_preds, axis=0)
    all_labels = np.concatenate(all_labels, axis=0)
    return f1_score(all_labels, all_preds, average='macro')

In [17]:
num_epochs = 10
for epoch in range(num_epochs):
    train(student_model, teacher_model, train_loader, optimizer, criterion, device)
    val_f1 = evaluate(student_model, val_loader, device)
    print(f'Epoch {epoch+1}/{num_epochs}, Validation Macro F1: {val_f1:.4f}')

  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:39<00:00,  6.25it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 95.92it/s] 


Epoch 1/10, Validation Macro F1: 0.1408


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:40<00:00,  6.24it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 97.36it/s] 


Epoch 2/10, Validation Macro F1: 0.1222


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:40<00:00,  6.25it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 94.73it/s] 


Epoch 3/10, Validation Macro F1: 0.1398


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:40<00:00,  6.25it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 95.31it/s]


Epoch 4/10, Validation Macro F1: 0.1491


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:40<00:00,  6.24it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 97.37it/s]


Epoch 5/10, Validation Macro F1: 0.1408


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:41<00:00,  6.23it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 95.68it/s]


Epoch 6/10, Validation Macro F1: 0.1408


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:40<00:00,  6.23it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 88.83it/s]


Epoch 7/10, Validation Macro F1: 0.1408


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:41<00:00,  6.23it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 91.04it/s]


Epoch 8/10, Validation Macro F1: 0.1491


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:40<00:00,  6.23it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 94.67it/s]


Epoch 9/10, Validation Macro F1: 0.1491


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Training: 100%|██████████| 2500/2500 [06:41<00:00,  6.23it/s]
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 93.70it/s]

Epoch 10/10, Validation Macro F1: 0.1491





In [18]:
# 테스트 세트 평가
test_f1 = evaluate(student_model, test_loader, device)
print(f'Test Macro F1: {test_f1:.4f}')

  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 98.08it/s]


Test Macro F1: 0.1500


In [19]:
import time

In [21]:
teacher_pipe = TextClassificationPipeline(
    model=teacher_model,
    tokenizer=tokenizer,
    device=0 if torch.cuda.is_available() else -1,  # gpu number, -1 if cpu used
    return_all_scores=True,
    function_to_apply='sigmoid'
)

# 학생 모델에 대한 파이프라인 생성
def student_predict(model, tokenizer, text, device):
    model.eval()
    inputs = tokenizer(text, return_tensors='pt', padding='max_length', truncation=True, max_length=512).to(device)
    with torch.no_grad():
        outputs = model(inputs['input_ids'])
    return outputs.cpu().numpy()

In [22]:
test_input = """재미있어요! 재미는 확실히 있는데 뭐랄까... 너무 정신 없달까...ㅋㅋ"""

In [23]:
# 선생님 모델 추론 시간 측정
start_time = time.time()
teacher_output = teacher_pipe(test_input)[0]
teacher_time = time.time() - start_time

# 학생 모델 추론 시간 측정
start_time = time.time()
student_output = student_predict(student_model, tokenizer, test_input, device)
student_time = time.time() - start_time

In [26]:
# 결과 출력
print("Teacher Model Inference Time: {:.6f} seconds".format(teacher_time))
print("Student Model Inference Time: {:.6f} seconds".format(student_time))

print("\nTeacher Model Output:")
for output in teacher_output:
    if output["score"] > 0.4:
        print(output)

print("\nStudent Model Output:")
for idx, score in enumerate(student_output[0]):
    if score > 0.4:
        print({'label': f'label_{idx}', 'score': score})

Teacher Model Inference Time: 0.019019 seconds
Student Model Inference Time: 0.009911 seconds

Teacher Model Output:
{'label': '안타까움/실망', 'score': 0.7095019817352295}
{'label': '즐거움/신남', 'score': 0.8421469330787659}
{'label': '당황/난처', 'score': 0.4448468089103699}
{'label': '행복', 'score': 0.46983569860458374}
{'label': '기쁨', 'score': 0.7035971879959106}

Student Model Output:
{'label': 'label_0', 'score': 0.83225137}
{'label': 'label_2', 'score': 0.5983677}
{'label': 'label_6', 'score': 0.5548921}
{'label': 'label_8', 'score': 0.5338372}
{'label': 'label_10', 'score': 0.7826703}
{'label': 'label_15', 'score': 0.40680328}
{'label': 'label_22', 'score': 0.67906785}
{'label': 'label_23', 'score': 0.6680849}
{'label': 'label_28', 'score': 0.65936023}
{'label': 'label_42', 'score': 0.54550856}


In [27]:
print("\nTeacher Model Structure:")
print(teacher_model)

print("\nStudent Model Structure:")
print(student_model)


Teacher Model Structure:
ElectraForSequenceClassification(
  (electra): ElectraModel(
    (embeddings): ElectraEmbeddings(
      (word_embeddings): Embedding(50135, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): ElectraEncoder(
      (layer): ModuleList(
        (0-11): 12 x ElectraLayer(
          (attention): ElectraAttention(
            (self): ElectraSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): ElectraSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
  

In [28]:
def evaluate_acc(model, dataloader, device):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            labels = batch['labels'].float().to(device)
            outputs = model(input_ids)
            preds = (outputs > 0.5).float()
            all_preds.append(preds.cpu().numpy())
            all_labels.append(labels.cpu().numpy())
    all_preds = np.concatenate(all_preds, axis=0)
    all_labels = np.concatenate(all_labels, axis=0)
    return f1_score(all_labels, all_preds, average='macro'), accuracy_score(all_labels, (all_preds > 0.5))

In [30]:
from sklearn.metrics import accuracy_score

In [31]:
test_f1, test_accuracy = evaluate_acc(student_model, test_loader, device)
print(f'Teacher - Test Macro F1: {test_f1:.4f}, Test Accuracy: {test_accuracy:.4f}')

test_f1, test_accuracy = evaluate_acc(student_model, test_loader, device)
print(f'Student - Test Macro F1: {test_f1:.4f}, Test Accuracy: {test_accuracy:.4f}')

  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 100.15it/s]


Teacher - Test Macro F1: 0.1014, Test Accuracy: 0.0000


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask', 'labels']}
Evaluating: 100%|██████████| 313/313 [00:03<00:00, 100.00it/s]


Student - Test Macro F1: 0.1014, Test Accuracy: 0.0000
