# Semantic Similarity with BERT

## Introduction

의미 론적 유사성은 두 문장이 의미하는 측면에서 얼마나 유사한 지 결정하는 작업입니다. 이 예제는 SNLI (Stanford Natural Language Inference) Corpus를 사용하여 Transformer와의 문장 의미 유사성을 예측하는 방법을 보여줍니다. 두 문장을 입력으로 받아이 두 문장에 대한 유사성 점수를 출력하는 BERT 모델을 미세 조정합니다.

## Setup

In [None]:
#!pip install transformer

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchsummary import summary as summary_
import transformers
import sys
from torchtext.legacy import data, datasets
from transformers import BertTokenizer, BertModel
from dataclasses import dataclass
import numpy as np
import pandas as pd

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
device = "cpu"

## Configuration

In [4]:
max_length = 128  # 모델 input sentence의 최대 길이
batch_size = 32
epochs = 2

# 데이터 셋 레이블
labels = ["contradiction", "entailment", "neutral"]

## Load the Data

In [None]:
!curl -LO https://raw.githubusercontent.com/MohamadMerchant/SNLI/master/data.tar.gz
!tar -xvzf data.tar.gz

In [5]:
# There are more than 550k samples in total; we will use 100k for this example.
train_df = pd.read_csv("SNLI_Corpus/snli_1.0_train.csv", nrows=100000)
valid_df = pd.read_csv("SNLI_Corpus/snli_1.0_dev.csv")
test_df = pd.read_csv("SNLI_Corpus/snli_1.0_test.csv")

# Shape of the data
print(f"Total train samples : {train_df.shape[0]}")
print(f"Total validation samples: {valid_df.shape[0]}")
print(f"Total test samples: {valid_df.shape[0]}")

Total train samples : 100000
Total validation samples: 10000
Total test samples: 10000


데이터 세트 개요 :

- sentence 1 : 저자에게 제공된 전제 캡션.
- sentence 2 : 저자가 작성한 가설 캡션.
- similarity : 대부분의 어노 테이터가 선택한 레이블. 다수가 존재하지 않는 경우 레이블 "-"가 사용 (여기서는 이러한 샘플을 건너 뛴다).

데이터 세트의 "유사성"라벨 값

- Contradiction(모순) : 문장은 유사성을 공유하지 않는다. 
- Entailment(수반) : 문장의 의미가 비슷하다.
- Neutral(중립) : 문장이 중립적이다.

데이터 세트의 한 샘플

In [6]:
print(f"Sentence1: {train_df.loc[1, 'sentence1']}")
print(f"Sentence2: {train_df.loc[1, 'sentence2']}")
print(f"Similarity: {train_df.loc[1, 'similarity']}")

Sentence1: A person on a horse jumps over a broken down airplane.
Sentence2: A person is at a diner, ordering an omelette.
Similarity: contradiction


## Preprocessing

In [7]:
# We have some NaN entries in our train data, we will simply drop them.
print("Number of missing values")
print(train_df.isnull().sum())
train_df.dropna(axis=0, inplace=True)

Number of missing values
similarity    0
sentence1     0
sentence2     3
dtype: int64


traing targets의 분포

In [8]:
print("Train Target Distribution")
print(train_df.similarity.value_counts())

Train Target Distribution
entailment       33384
contradiction    33310
neutral          33193
-                  110
Name: similarity, dtype: int64


vaildation targets의 분포

In [9]:
print("Validation Target Distribution")
print(valid_df.similarity.value_counts())

Validation Target Distribution
entailment       3329
contradiction    3278
neutral          3235
-                 158
Name: similarity, dtype: int64


"-"값은 교육 및 검증 tagets에서 일부 나타난다. 이 샘플은 생략

In [10]:
train_df = (
    train_df[train_df.similarity != "-"]
    .sample(frac=1.0, random_state=42)
    .reset_index(drop=True)
)
valid_df = (
    valid_df[valid_df.similarity != "-"]
    .sample(frac=1.0, random_state=42)
    .reset_index(drop=True)
)

One-hot encode training, validation, and test labels.

In [12]:
train_df["label"] = train_df["similarity"].apply(
    lambda x: 0 if x == "contradiction" else 1 if x == "entailment" else 2
)
y_train = torch.LongTensor(train_df["label"].values)

valid_df["label"] = valid_df["similarity"].apply(
    lambda x: 0 if x == "contradiction" else 1 if x == "entailment" else 2
)
y_val = torch.LongTensor(valid_df["label"].values)

test_df["label"] = test_df["similarity"].apply(
    lambda x: 0 if x == "contradiction" else 1 if x == "entailment" else 2
)
y_test = torch.LongTensor(test_df["label"].values)

## Create a custom data generator

sentence1과 sentence2를 묶어 pair로 사용.

batch size 별로 batch data 생성

셔플할건지와 타겟도 포함할 건지 선택

pretrained된 Bert tokenizer 사용

In [68]:
class BertSemanticDataGenerator:
    """Generates batches of data.

    Args:
        sentence_pairs: Array of premise and hypothesis input sentences.
        labels: Array of labels.
        batch_size: Integer batch size.
        shuffle: boolean, whether to shuffle the data.
        include_targets: boolean, whether to incude the labels.

    Returns:
        Tuples `([input_ids, attention_mask, `token_type_ids], labels)`
        (or just `[input_ids, attention_mask, `token_type_ids]`
         if `include_targets=False`)
    """

    def __init__(
        self,
        sentence_pairs,
        labels,
        batch_size=batch_size,
        shuffle=True,
        include_targets=True,
    ):
        self.sentence_pairs = sentence_pairs
        self.labels = labels
        self.shuffle = shuffle
        self.batch_size = batch_size
        self.include_targets = include_targets
        
        # base-base-uncased pretrained model bert tokenizer 사용
        self.tokenizer = transformers.BertTokenizer.from_pretrained(
            "bert-base-uncased", do_lower_case=True
        )
        self.indexes = np.arange(len(self.sentence_pairs))
        self.on_epoch_end()

    def __len__(self):
        return len(self.sentence_pairs) // self.batch_size

    def __getitem__(self, idx):
        # 인덱스 배치 검색
        indexes = self.indexes[idx * self.batch_size : (idx + 1) * self.batch_size]
        sentence_pairs = self.sentence_pairs[indexes]

        # batch_encode_plus 배치를 사용하면 
        # 두 문장의 배치가 함께 있고 [SEP] 토큰으로 구분
        encoded = self.tokenizer.batch_encode_plus(
            sentence_pairs.tolist(),
            add_special_tokens=True,
            max_length=max_length,
            return_attention_mask=True,
            return_token_type_ids=True,
            padding='max_length',
            return_tensors="pt",
            truncation=True
        )

        # 인코딩 된 feature를 numpy 배열로 변환
        input_ids = np.array(encoded["input_ids"], dtype="int32")
        attention_masks = np.array(encoded["attention_mask"], dtype="int32")
        token_type_ids = np.array(encoded["token_type_ids"], dtype="int32")

        # 훈련 / 검증에 사용되는 경우 true로 설정
        if self.include_targets:
            labels = np.array(self.labels[indexes], dtype="int32")
            return [input_ids, attention_masks, token_type_ids], labels
        else:
            return [input_ids, attention_masks, token_type_ids]

    def on_epoch_end(self):
        # 각 Epoch 후에 인덱스를 섞는다.
        if self.shuffle:
            np.random.RandomState(42).shuffle(self.indexes)

## 모델 아키텍쳐 구성

1. pretrained Bert
2. bi-LSTM
3. Global Avg Pool 1d
4. Global Max Pool 1d
5. 3+4 concat
6. dropout 0.3
7. fc

In [69]:
class BertSemanticSimilarity(nn.Module):
    def __init__(self):
        super(BertSemanticSimilarity, self).__init__()
        
        self.bert_model = transformers.BertModel.from_pretrained('bert-base-uncased')
        self.bert_model.trainable = False
        
        self.emb_dim = self.bert_model.config.to_dict()['hidden_size']
        self.bi_lstm = nn.LSTM(input_size=self.emb_dim,
                              hidden_size=64,
                              bidirectional=True)
#         self.avg_pool = nn.AdaptiveAvgPool1d()
#         self.max_pool = nn.AdaptiveMaxPool1d()
        self.linear = nn.Linear(256, 3)
        self.dropout = nn.Dropout(p=0.3)
    
    def forward(self, input_ids, attention_masks, token_type_ids):
        outputs = self.bert_model(input_ids, 
                                  attention_mask=attention_masks, 
                                  token_type_ids=token_type_ids)

        bi_lstm, (hidden, cell)= self.bi_lstm(outputs.last_hidden_state)
        concat = torch.cat([bi_lstm.mean(2), bi_lstm.max(2)[0]], 1)
        dropout = self.dropout(concat)
        output = self.linear(dropout)
        
        return output

In [70]:
model = BertSemanticSimilarity().to(device)

다중 분류 문제이기 때문에 cross entropy loss 사용

In [17]:
loss_fn = nn.CrossEntropyLoss().to(device)

def binary_accuracy(preds, y):
    argmax_preds = torch.argmax(preds, dim=1)
    correct = (argmax_preds == y).float() 
    acc = correct.sum()/len(correct)
    return acc

In [65]:
def train(model, tain_data, optimizer, loss_fn, idx_epoch, batch_size):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train() 

    for idx, batch in enumerate(train_data):
        
        optimizer.zero_grad()
        
        input_ids = torch.from_numpy(batch[0][0])
        attention_masks = torch.from_numpy(batch[0][1])
        token_type_ids = torch.from_numpy(batch[0][2])
        label = torch.from_numpy(batch[1])

        predictions = model.forward(input_ids, 
                                    attention_masks, 
                                    token_type_ids).squeeze(1)

        loss = loss_fn(predictions, label.long())
        acc = binary_accuracy(predictions, label)
        
        sys.stdout.write(
                    "\r" + f"[Train] Epoch : {idx_epoch:^3}"\
                    f"[{(idx + 1) * batch_size} / {len(train_data) * batch_size} ({100. * (idx + 1) / len(train_data) :.4}%)]"\
                    f"  Loss: {loss.item():.4}"\
                    f"  Acc : {acc.item():.4}"\
                    )

        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss/len(train_data) , epoch_acc/len(train_data)

In [45]:
def evaluate(model, data, loss_fn):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    with torch.no_grad():
        for batch in data:
            input_ids = torch.from_numpy(batch[0][0])
            attenttion_masks = torch.from_numpy(batch[0][1])
            token_type_ids = torch.from_numpy(batch[0][2])
            label = torch.from_numpy(batch[1])
            predictions = model.forward(input_ids, 
                                    attention_masks, 
                                    token_type_ids).squeeze(1)
            loss = criterion(predictions, label.long())
            acc = binary_accuracy(predictions, label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(train_data), epoch_acc / len(train_data)

In [None]:
def predict(model, data):
    input_ids = torch.from_numpy(data[0][0])
    attenttion_masks = torch.from_numpy(data[0][1])
    token_type_ids = torch.from_numpy(data[0][2])
    label = torch.from_numpy(data[1])
    
    predictions = model.forward(input_ids,
                                attention_masks,
                                token_type_ids).squeeze(1)
    
    return predictions

## Training, Validation Data Generator 생성

In [71]:
train_data = BertSemanticDataGenerator(
    train_df[["sentence1", "sentence2"]].values.astype("str"),
    y_train,
    batch_size=batch_size,
    shuffle=True,
)
valid_data = BertSemanticDataGenerator(
    valid_df[["sentence1", "sentence2"]].values.astype("str"),
    y_val,
    batch_size=batch_size,
    shuffle=False,
)

## 학습

In [None]:
best_valid_loss = float('inf')
model_name = f"{BertSemanticSimilarity}"
optimizer = torch.optim.Adam(model.parameters())

print('----------------------------------------------------------------')
print(f'Model name : {model_name}')
print('----------------------------------------------------------------')

for epoch in range(epochs):
    train_loss, train_acc = train(model, train_data, optimizer, loss_fn, epoch, batch_size)
    valid_loss, valid_acc = evaluate(model, valid_data, loss_fn)
    print('')
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), f'./{model_name}.pt')
        print(f'\t Saved at {epoch}-epoch')

    print(f'\t Epoch : {epoch} | Train Loss : {train_loss:.4} | Train Acc : {train_acc:.4}')
    print(f'\t Epoch : {epoch} | Valid Loss : {valid_loss:.4} | Valid Acc : {valid_acc:.4}')

----------------------------------------------------------------
Model name : <class '__main__.BertSemanticSimilarity'>
----------------------------------------------------------------

### Fine Tuning

In [None]:
model.bert_model.trainable = True

best_valid_loss = float('inf')
model_name = f"{BertSemanticSimilarity}"
optimizer = torch.optim.Adam(model.parameters())

print('----------------------------------------------------------------')
print(f'Model name : {model_name}')
print('----------------------------------------------------------------')

for epoch in range(epochs):
    train_loss, train_acc = train(model, train_data, optimizer, loss_fn, epoch, batch_size)
    valid_loss, valid_acc = evaluate(model, valid_data, loss_fn)
    print('')
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), f'./{model_name}.pt')
        print(f'\t Saved at {epoch}-epoch')

    print(f'\t Epoch : {epoch} | Train Loss : {train_loss:.4} | Train Acc : {train_acc:.4}')
    print(f'\t Epoch : {epoch} | Valid Loss : {valid_loss:.4} | Valid Acc : {valid_acc:.4}')

## Test

In [None]:
test_data = BertSemanticDataGenerator(
    test_df[["sentence1", "sentence2"]].values.astype("str"),
    y_test,
    batch_size=batch_size,
    shuffle=False,
)

In [None]:
model.load_state_dict(torch.load(f'./{model_name}.pt'))
test_loss, test_acc = evaluate(model, test_data, loss_fn)
print(f'Test Loss : {test_loss:.4} | Test Acc : {test_acc:.4}')

In [None]:
def check_similarity(sentence1, sentence2):
    sentence_pairs = np.array([[str(sentence1), str(sentence2)]])
    test_data = BertSemanticDataGenerator(
        sentence_pairs, labels=None, batch_size=1, shuffle=False, include_targets=False,
    )

    proba = model.predict(test_data)
    idx = np.argmax(proba)
    proba = f"{proba[idx]: .2f}%"
    pred = labels[idx]
    return pred, proba

In [None]:
sentence1 = "Two women are observing something together."
sentence2 = "Two women are standing with their eyes closed."
check_similarity(sentence1, sentence2)