In [96]:
from konlpy.tag import Okt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from tqdm import tqdm
import os
import json

In [97]:
class SentenceDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_length):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        
        # 토큰화 및 인덱스 변환
        tokens = [self.vocab.get(token, self.vocab['<UNK>']) for token in okt.morphs(text)]
        
        # 패딩 처리
        if len(tokens) < self.max_length:
            tokens += [self.vocab['<PAD>']] * (self.max_length - len(tokens))
        else:
            tokens = tokens[:self.max_length]
        
        tokens = torch.tensor(tokens, dtype=torch.long)
        label = torch.tensor(label, dtype=torch.long)
        
        return tokens, label

In [98]:
predict_target = 'linguistic_acceptability'

In [99]:
#데이터 불러오기
texts = []
labels = []

#실행 파일과 같은 폴더에 파일들이 존재해야 합니다.
for i in range(1, 100):
    file_name = f'기술_과학_{i}.json'  
    
    if os.path.exists(file_name):  # 파일이 존재하는 경우에만 처리
        with open(file_name, 'r', encoding='utf-8') as file:
            data = json.load(file)
        
        for conversation in data["dataset"].get("conversations", []):
            for utterance in conversation.get("utterances", []):
                text = utterance.get("utterance_text", "")
                evaluation = utterance.get("utterance_evaluation", [])
                if evaluation:  # 평가 대상이 아닌 문장을 제외
                    texts.append(text)
                    labels.append(evaluation)

In [100]:
# 속성별 레이블 추출

attribute_labels = {
    'linguistic_acceptability': [],
    'consistency': [],
    'interestingness': [],
    'unbias': [],
    'harmlessness': [],
    'no_hallucination': [],
    'understandability': [],
    'sensibleness': [],
    'specificity': []
}

for i in range(len(labels)):
    # 현재 대화에 대한 속성값 초기화
    attribute_values = {key: 0 for key in attribute_labels.keys()}

    for j in range(len(labels[i])):
        for key in attribute_labels.keys():
            if labels[i][j].get(key) == 'yes':
                attribute_values[key] += 1

    # 각 속성별로 모든 발화에서 'yes'인 경우만 1, 아니면 0
    for key in attribute_labels.keys():
        if attribute_values[key] == len(labels[i]):
            attribute_labels[key].append(1)
        else:
            attribute_labels[key].append(0)

In [101]:
#토큰화
okt = Okt()
tokenized_sentences = [okt.morphs(sentence) for sentence in texts]
vocab = {'<PAD>': 0, '<UNK>': 1}
for tokens in tokenized_sentences:
    for token in tokens:
        if token not in vocab:
            vocab[token] = len(vocab)

max_length = 10
dataset = SentenceDataset(texts, attribute_labels[predict_target], vocab, max_length)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

In [102]:
#모델
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx):
        super(LSTMClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.affine = nn.Linear(hidden_dim, hidden_dim) 
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        embedded = self.embedding(x)
        _, (hidden, _) = self.lstm(embedded)
        output = self.fc(hidden[-1])
        return output

class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx):
        super(RNNClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True) 
        self.affine = nn.Linear(hidden_dim, hidden_dim) 
        self.fc = nn.Linear(hidden_dim, output_dim)  

    def forward(self, x):
        embedded = self.embedding(x)
        _, hidden = self.rnn(embedded)  
        affine_output = self.affine(hidden[-1])  
        output = self.fc(affine_output)  
        return output

In [103]:
#학습
vocab_size = len(vocab)
embedding_dim = 50
hidden_dim = 64
output_dim = len(set(attribute_labels[predict_target]))  # 라벨의 개수
padding_idx = vocab['<PAD>']
context_size = 5

model = LSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx)
# model = RNNClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 10

model.train()
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}/{epochs}")
    epoch_loss = 0
    for input_ids, labels in tqdm(train_loader):
        input_ids, labels = input_ids.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(input_ids)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
    print(f"Epoch Loss: {epoch_loss:.4f}")

Epoch 1/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:02<00:00, 10.89it/s]


Epoch Loss: 13.7643
Epoch 2/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:02<00:00, 11.32it/s]


Epoch Loss: 10.8150
Epoch 3/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:02<00:00, 11.38it/s]


Epoch Loss: 9.6210
Epoch 4/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:01<00:00, 12.12it/s]


Epoch Loss: 8.4565
Epoch 5/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:02<00:00, 11.48it/s]


Epoch Loss: 7.2943
Epoch 6/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:02<00:00, 11.43it/s]


Epoch Loss: 6.1140
Epoch 7/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:02<00:00, 11.39it/s]


Epoch Loss: 4.8680
Epoch 8/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:01<00:00, 11.99it/s]


Epoch Loss: 4.0508
Epoch 9/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:01<00:00, 11.63it/s]


Epoch Loss: 2.9164
Epoch 10/10


100%|██████████████████████████████████████████████████████████████████████████████████| 23/23 [00:01<00:00, 11.58it/s]

Epoch Loss: 2.2179





In [104]:
#예측 함수
def predict(sentence):
    model.eval()
    tokens = okt.morphs(sentence)
    input_ids = [vocab.get(token, vocab['<UNK>']) for token in tokens]
    input_ids = input_ids[:max_length] + [vocab['<PAD>']] * (max_length - len(input_ids))
    input_tensor = torch.tensor([input_ids], dtype=torch.long).to(device)
    with torch.no_grad():
        outputs = model(input_tensor)
        prediction = torch.argmax(outputs, dim=1).item()
    return prediction

In [105]:
#테스트 코드
texts = []
labels = []

for i in range(100, 200):
    file_name = f'기술_과학_{i}.json'  
    
    if os.path.exists(file_name):  # 파일이 존재하는 경우에만 처리
        with open(file_name, 'r', encoding='utf-8') as file:
            data = json.load(file)
        
        for conversation in data["dataset"].get("conversations", []):
            for utterance in conversation.get("utterances", []):
                text = utterance.get("utterance_text", "")
                evaluation = utterance.get("utterance_evaluation", [])
                if evaluation:  # 평가 대상이 아닌 문장을 제외
                    texts.append(text)
                    labels.append(evaluation)
attribute_labels = {
    'linguistic_acceptability': [],
    'consistency': [],
    'interestingness': [],
    'unbias': [],
    'harmlessness': [],
    'no_hallucination': [],
    'understandability': [],
    'sensibleness': [],
    'specificity': []
}

# 속성별 레이블 추출
for i in range(len(labels)):
    # 현재 대화에 대한 속성값 초기화
    attribute_values = {key: 0 for key in attribute_labels.keys()}

    for j in range(len(labels[i])):
        for key in attribute_labels.keys():
            if labels[i][j].get(key) == 'yes':
                attribute_values[key] += 1

    # 각 속성별로 모든 발화에서 'yes'인 경우만 1, 아니면 0
    for key in attribute_labels.keys():
        if attribute_values[key] == len(labels[i]):
            attribute_labels[key].append(1)
        else:
            attribute_labels[key].append(0)

result = []
for i in range(len(texts)):
    predicted_label = predict(texts[i])
    result.append(predicted_label)

In [106]:
from sklearn.metrics import accuracy_score

accuracy_score(attribute_labels[predict_target], result)

0.7490247074122237

In [107]:
result.count(0)

213