In [1]:
import pandas as pd

# NSMC 훈련용 코퍼스 내 문장 및 라벨 데이터 추출

f_train = pd.read_csv('맛집리뷰text.txt', sep='\t')
train_pair = [(row[1], row[2]) for _, row in f_train.iterrows() if type(row[1]) == str]

train_data  = [pair[0] for pair in train_pair]
train_label = [pair[1] for pair in train_pair]

In [2]:
# 추출된 문장 및 라벨 데이터 일부 확인

for data, label in zip(train_data[:3], train_label[:3]):
    print(f'문장: {data}\n라벨: {label}\n')

문장: 가성비 최고에 맛까지 좋은 노포 식당입니다..
라벨: 1

문장: 보통의 닭곰탕집들처럼 노계나 폐닭을 사용하지 않고 일반 육계를 사용해서 잡내없고 육질이 참 부드럽습니다..
라벨: 1

문장: 멸치육수베이스의 칼국수도 딱 멸치육수의 교과서같은 맛입니다..
라벨: 1



In [3]:
# 프로젝트 전반에 사용될 변수 사전 정의

params = {
    'batch_size': 64,
    'num_epoch': 15,
    'lr': 0.003,
    'dropout': 0.5,
    'min_frequency': 3,
    'max_len': 20,
    
    'vocab_size': 20000,
    'embed_dim': 100,
    'hidden_dim': 256,
    'filter_sizes': [2, 3, 4],
    'num_filters': 100,
    'output_dim': 1,
}

In [4]:
# 추출한 문장 데이터 텍스트 파일로 저장

with open('train_tokenizer.txt', 'w', encoding='utf-8') as f:
    for line in train_data:
        print(line, file=f)

In [5]:
from tokenizers import BertWordPieceTokenizer

# BertWordPieceTokenizer 초기화

tokenizer = BertWordPieceTokenizer()

# 앞서 제작한 텍스트 파일 활용해 토크나이저 훈련

trainer = tokenizer.train(
    files=['train_tokenizer.txt'],
    vocab_size=params['vocab_size'],
    min_frequency=params['min_frequency'],
    special_tokens=['[PAD]', '[SOS]', '[EOS]', '[UNK]'],
    wordpieces_prefix="##"
)

In [6]:
# 토크나이저 파라미터 확인

tokenizer._parameters

{'model': 'BertWordPiece',
 'unk_token': '[UNK]',
 'sep_token': '[SEP]',
 'cls_token': '[CLS]',
 'pad_token': '[PAD]',
 'mask_token': '[MASK]',
 'clean_text': True,
 'handle_chinese_chars': True,
 'strip_accents': None,
 'lowercase': True,
 'wordpieces_prefix': '##'}

In [7]:
# 패딩 토큰 인덱스 확인

print(tokenizer.token_to_id('[PAD]'))

0


In [8]:
# 스페셜 토큰 변수화

pad_idx = tokenizer.token_to_id('[PAD]')
sos_idx = tokenizer.token_to_id('[SOS]')
eos_idx = tokenizer.token_to_id('[EOS]')

In [9]:
# 토크나이저에 대해 패딩 옵션 설정
tokenizer.enable_padding(pad_id=pad_idx, pad_token='[PAD]')

In [10]:
# 'encode_batch' 함수 이용해 훈련 데이터셋에 대해 토크나이즈 수행

encoded_data = tokenizer.encode_batch(train_data)

In [11]:
# 데이터 개수 확인

print(f'훈련 데이터:\t{len(train_data)} 개')
print(f'훈련 라벨:\t{len(train_label)} 개')
print(f'인코딩 데이터:\t{len(encoded_data)} 개')

훈련 데이터:	39 개
훈련 라벨:	39 개
인코딩 데이터:	39 개


In [12]:
# 토크나이저 초기 훈련 결과 확인

print(f'토큰: {encoded_data[20].tokens}\n')
print(f'아이디: {encoded_data[20].ids}')

토큰: ['혼', '##ᄇ', '##ᅡᆸ', '##하', '##기', '좋아요', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '

In [13]:
# 후처리 함수 정의

def postprocess(input_ids):
    input_ids = [sos_idx] + input_ids
    
    # 문장 최대 길이까지 슬라이싱
    input_ids = input_ids[:params['max_len']]

    # 패딩 토큰이 포함된 문장이라면 원 문장 말미에 <EOS> 토큰 삽입 
    if pad_idx in input_ids:
        pad_start = input_ids.index(pad_idx)
        input_ids[pad_start] = eos_idx

    # 패딩 토큰이 포함되지 않은 문장이라면, 시퀀스 말미에 <EOS> 토큰 삽입
    else:
        input_ids[-1] = eos_idx
    
    return input_ids

In [14]:
# 후처리 함수 이용해 토크나이즈 문장 후처리

processed_data = [postprocess(data.ids) for data in encoded_data]

In [15]:
# 후처리 결과 확인

print(f'후처리 결과: {processed_data[20]}\n')
print(f'후처리 결과 디코딩: {tokenizer.decode(processed_data[20])}')

후처리 결과: [1, 336, 104, 292, 168, 180, 271, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

후처리 결과 디코딩: 혼밥하기 좋아요


In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.utils import data

torch.manual_seed(32)
torch.cuda.manual_seed(32)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [17]:
# CNN 모델 정의

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.embedding = nn.Embedding(params['vocab_size'], params['embed_dim'], padding_idx=pad_idx)

        self.convs = nn.ModuleList([nn.Conv2d(in_channels=1, 
                                              out_channels=params['num_filters'], 
                                              kernel_size=(fs, params['embed_dim'])) 
                                    for fs in params['filter_sizes']])
        
        self.fc = nn.Linear(len(params['filter_sizes']) * params['num_filters'], 1)
        
        self.dropout = nn.Dropout(params['dropout'])
        
    def forward(self, input_ids):
        # input_ids    = [배치 사이즈, 문장 길이]

        embedded = self.embedding(input_ids).unsqueeze(1)
        # embedded     = [배치 사이즈, 채널 개수, 임베딩 차원]
        
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        # conved_n     = [배치 사이즈, 필터 개수, 문장 길이 - 필터 리스트[n] + 1]
        
        max_pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        # max_pooled_n = [배치 사이즈, 필터 개수]

        cat = self.dropout(torch.cat(max_pooled, dim = 1))
        # cat          = [배치 사이즈, 필터 개수 x len(필터 리스트)]

        return self.fc(cat)  # [배치 사이즈, 1]

In [18]:
# 문장 및 라벨 데이터 torch Tensor로 변환

processed_data = [torch.LongTensor(data).to(device) for data in processed_data]
train_label = [torch.FloatTensor([label]).to(device) for label in train_label]


# torch Tensor로 변환한 데이터 이용해 Iterator 정의

train_iter = data.DataLoader(processed_data, batch_size=params['batch_size'])
label_iter = data.DataLoader(train_label, batch_size=params['batch_size'])

In [19]:
model = CNN()
model.to(device)

criterion = nn.BCEWithLogitsLoss()
criterion.to(device)

optimizer = optim.Adam(model.parameters(), lr=params['lr'])

for epoch in range(params['num_epoch']):
    model.train()
    epoch_loss = 0
    
    for (batch, label) in zip(train_iter, label_iter):
        optimizer.zero_grad()

        logits = model(batch).squeeze(1)    # [배치 사이즈]
        labels = label.view(label.size(0))  # [배치 사이즈]

        loss = criterion(logits, labels)
        epoch_loss += loss.item()
        
        loss.backward()
        optimizer.step()
        
    train_loss = epoch_loss / len(train_iter)        
    print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f}')

Epoch: 01 | Train Loss: 0.814
Epoch: 02 | Train Loss: 0.460
Epoch: 03 | Train Loss: 0.334
Epoch: 04 | Train Loss: 0.222
Epoch: 05 | Train Loss: 0.142
Epoch: 06 | Train Loss: 0.076
Epoch: 07 | Train Loss: 0.059
Epoch: 08 | Train Loss: 0.076
Epoch: 09 | Train Loss: 0.054
Epoch: 10 | Train Loss: 0.029
Epoch: 11 | Train Loss: 0.022
Epoch: 12 | Train Loss: 0.012
Epoch: 13 | Train Loss: 0.014
Epoch: 14 | Train Loss: 0.015
Epoch: 15 | Train Loss: 0.019


In [20]:
# 결과 해석에 필요한 Captum API 임포트 및 정의

from captum.attr import LayerIntegratedGradients, TokenReferenceBase, visualization

token_reference = TokenReferenceBase(reference_token_idx=pad_idx)  # 레퍼런스 생성을 위한 모듈
lig = LayerIntegratedGradients(model, model.embedding)             # 결과 해석에 사용되는 IntegratedGradient 기법 모듈

In [31]:
vis_data_records_ig = []

label_vocab  = {0: '부정', 1: '긍정'}

def interpret_sentence(model, sentence, label = 0):       
    model.zero_grad()
    
    input_ids = tokenizer.encode(sentence)
    input_tokens = input_ids.tokens[:params['max_len']]
    
    input_ids = postprocess(input_ids.ids)
    input_indices_tensor = torch.LongTensor(input_ids).to(device).unsqueeze(0)

    # 단일 문장에 대한 예측 작업 수행
    pred = torch.sigmoid(model(input_indices_tensor)).item()
    pred_ind = round(pred)

    # 베이스 라인 역할을 할 Reference 생성: 주로 패딩 토큰으로 채워줌
    reference_indices = token_reference.generate_reference(params['max_len'], device=device).unsqueeze(0)

    # LayerIntegratedGradients 모듈 활용해 개별 단어의 속성값 및 델타값 근사치 계산
    attributions_ig, delta = lig.attribute(input_indices_tensor, reference_indices, n_steps=500, return_convergence_delta=True)

    print('pred: ', label_vocab[pred_ind], '(', '%.2f'%pred, ')', ', delta: ', abs(delta))
    
    add_attributions_to_visualizer(attributions_ig, input_tokens, )

    
    
    
def add_attributions_to_visualizer(attributions, text, pred, pred_ind, label, delta, vis_data_records):
    attributions = attributions.sum(dim=2).squeeze(0)
    attributions = attributions / torch.norm(attributions)
    attributions = attributions.cpu().detach().numpy()

    # 시각화 위해 샘플을 리스트에 추가
    vis_data_records.append(visualization.
                                VisualizationDataRecord(
                                    attributions,
                                    pred,
                                    label_vocab[pred_ind],
                                    label_vocab[label],
                                    label_vocab[1],
                                    attributions.sum(),       
                                    text,
                                delta
                                )
                           )

In [32]:
# 예제 문장 추가 및 분석 수행
interpret_sentence(model, '맛있어서 또 방문할 것 같아요. 국물이 일품입니다', label=1)
interpret_sentence(model, '가게도 낡았고 진짜 별로.. 사장님도 불친절하고 기분만 나빴네요', label=0)

pred:  긍정 ( 0.92 ) , delta:  tensor([0.9930], dtype=torch.float64)
pred:  부정 ( 0.11 ) , delta:  tensor([0.3498], dtype=torch.float64)


In [33]:
# 시각화 결과 표 변환
visualization.visualize_text(vis_data_records_ig)

True Label,Predicted Label,Attribution Label,Attribution Score,Word Importance


True Label,Predicted Label,Attribution Label,Attribution Score,Word Importance
