In [1]:
import pandas as pd
data = pd.read_excel('/content/drive/MyDrive/Colab Notebooks/project/Data4.xlsx', engine='openpyxl')

In [2]:
import pandas as pd
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from transformers import BertTokenizer
from torch.utils.data import Dataset, DataLoader
import torch
import numpy as np
from tqdm import tqdm

In [3]:
data['label_idx'] = data['label_idx'].apply(lambda x: list(map(int, x.split(','))))

In [4]:
mlb = MultiLabelBinarizer()
data_labels = mlb.fit_transform(data['label_idx'])

num_classes = len(mlb.classes_)
tokenizer = BertTokenizer.from_pretrained('monologg/kobert')

In [5]:
train_sentences, test_sentences, train_labels, test_labels = train_test_split(data['SENTENCE'], data_labels, test_size=0.2)

In [None]:
# 클래스별 샘플 수 계산
label_counts = pd.Series(np.sum(train_labels, axis=0))

# 가장 많은 샘플을 가진 클래스의 샘플 수
max_samples = label_counts.max()

# 오버샘플링된 훈련 데이터셋 초기화
oversampled_sentences = []
oversampled_labels = []

with tqdm(total=num_classes, desc="Oversampling", bar_format="{l_bar}{bar} [ time left: {remaining} ]") as pbar:
    for i in range(num_classes):
        # 클래스 i에 속하는 샘플 찾기
        class_sentences = train_sentences[train_labels[:, i] == 1]
        class_labels = train_labels[train_labels[:, i] == 1]

        # 클래스에 샘플이 없으면 건너뛰기
        if len(class_sentences) == 0:
            pbar.update(1)
            continue

        # 필요한 경우 샘플 복제
        n_samples = label_counts[i]
        while n_samples < max_samples:
            diff = min(len(class_sentences), max_samples - n_samples)
            oversampled_sentences.extend(class_sentences[:diff])
            oversampled_labels.extend(class_labels[:diff])
            n_samples += diff

        # 원래 클래스 샘플 추가
        oversampled_sentences.extend(class_sentences)
        oversampled_labels.extend(class_labels)
        pbar.update(1)

# 데이터셋을 섞음
oversampled_sentences, oversampled_labels = shuffle(oversampled_sentences, oversampled_labels)

# 오버샘플링된 데이터셋으로 대체
train_sentences = oversampled_sentences
train_labels = np.array(oversampled_labels)

class SentenceDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_encodings = tokenizer(list(train_sentences), truncation=True, padding=True)
test_encodings = tokenizer(list(test_sentences), truncation=True, padding=True)

train_dataset = SentenceDataset(train_encodings, train_labels)
test_dataset = SentenceDataset(test_encodings, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)

Oversampling:   9%|▉          [ time left: 02:04 ]

In [None]:
from torch.utils.data import Dataset
import torch

class SentenceDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)


In [None]:
from torch.utils.data import DataLoader

train_encodings = tokenizer(list(train_sentences), truncation=True, padding=True)
test_encodings = tokenizer(list(test_sentences), truncation=True, padding=True)

train_dataset = SentenceDataset(train_encodings, train_labels)
test_dataset = SentenceDataset(test_encodings, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)

In [None]:
from transformers import BertModel, get_linear_schedule_with_warmup
import torch.nn as nn
from transformers.optimization import AdamW
import numpy as np

# KoBERT 모델 불러오기
bert_model = BertModel.from_pretrained('monologg/kobert')

# CUDA 사용 가능하면 CUDA로, 아니면 CPU로 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# 손실 함수 및 옵티마이저 정의
class_weights =[]
total_samples = len(data)
label_counts = pd.Series([item for sublist in mlb.inverse_transform(data_labels) for item in sublist]).value_counts()
for i in range(num_classes):
    # 클래스 i의 샘플 수
    class_count = label_counts.get(i, 0)

    # 클래스 가중치 계산: 클래스가 덜 나타날수록 더 큰 가중치를 부여
    if class_count > 0:
        weight = total_samples / (num_classes * class_count)
    else:
        weight = 0  # 클래스가 데이터셋에 전혀 나타나지 않는 경우

    class_weights.append(weight)

class_weights = torch.tensor(class_weights).to(device)



loss_fn = nn.BCEWithLogitsLoss(pos_weight=class_weights)

num_epochs = 5

total_steps = len(train_loader) * num_epochs

class CustomModel(nn.Module):
    def __init__(self, bert_model, num_classes):
        super(CustomModel, self).__init__()
        self.bert_model = bert_model
        self.num_classes = num_classes
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.bert_model.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert_model(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits

custom_model = CustomModel(bert_model, num_classes)
custom_model = custom_model.to(device)

optimizer = AdamW(custom_model.parameters(), lr=5e-5)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)



In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch import nn
from tqdm import tqdm
from transformers import get_linear_schedule_with_warmup
import numpy as np

# 학습 시작
for epoch in range(num_epochs):
    # Training
    total_loss = 0
    custom_model.train()
    for batch in tqdm(train_loader):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device).float()

        outputs = custom_model(input_ids=input_ids, attention_mask=attention_mask)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()

    # Validation
    custom_model.eval()
    predictions, actuals = [], []
    with torch.no_grad():
        for batch in tqdm(test_loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device).float()

            outputs = custom_model(input_ids=input_ids, attention_mask=attention_mask)
            preds = (torch.sigmoid(outputs) > 0.5).cpu().detach().numpy()
            predictions.extend(preds)
            actuals.extend(labels.cpu().detach().numpy())

    predictions = np.array(predictions)
    actuals = np.array(actuals)

    accuracy = accuracy_score(actuals.ravel(), predictions.ravel())
    precision = precision_score(actuals.ravel(), predictions.ravel(), average='micro')
    recall = recall_score(actuals.ravel(), predictions.ravel(), average='micro')
    f1 = f1_score(actuals.ravel(), predictions.ravel(), average='micro')

    print(f'Epoch {epoch+1}/{num_epochs}')
    print(f'Loss: {total_loss/len(train_loader):.4f}')
    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')


100%|██████████| 787/787 [01:40<00:00,  7.86it/s]
100%|██████████| 197/197 [00:05<00:00, 33.40it/s]
  _warn_prf(average, modifier, msg_start, len(result))


Epoch 1/5
Loss: 0.0643
Accuracy: 0.9996
Precision: 0.9992
Recall: 0.9996
F1 Score: 0.9994


 19%|█▉        | 153/787 [00:19<01:21,  7.78it/s]


KeyboardInterrupt: ignored

In [None]:
def predict_sentence(model, tokenizer, sentence, threshold=0.3):
    # 모델의 디바이스 확인 (CPU 또는 CUDA)
    device = next(model.parameters()).device

    # 문장 토큰화
    inputs = tokenizer(sentence, return_tensors='pt', truncation=True, padding=True)
    inputs = {k: v.to(device) for k, v in inputs.items() if k != 'token_type_ids'}

    # 모델 예측
    with torch.no_grad():
        outputs = model(**inputs)

    # 활성화 함수 적용 및 임계값을 사용하여 라벨 결정
    predictions = torch.sigmoid(outputs).cpu().numpy() > threshold  # .cpu()를 추가하여 GPU 텐서를 CPU로 이동

    # 예측된 라벨 인덱스 추출
    predicted_labels = [i for i, label in enumerate(predictions[0]) if label]

    return predicted_labels

# 임의의 문장을 사용하여 모델 테스트
for i in range(10):
  test_sentence = input()
  predicted_labels = predict_sentence(custom_model, tokenizer, test_sentence)

  print("Predicted label indices:", predicted_labels)
  if test_sentence == "수고하세요":
    break

지금 배달가능한가요?
Predicted label indices: [3971]
수고하세요
Predicted label indices: [1616]
