# 1. KoGPT2

In [None]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
!pip install ratsnlp

In [None]:
#### 1. 데이터 전처리 ####

import pandas as pd
import re
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, PreTrainedTokenizerFast, GPT2LMHeadModel

# 데이터를 읽는다.
Chatbot_Data = pd.read_csv("/content/drive/MyDrive/NLPicasso/최종/Gaslighting_augmentation_data.csv")
Chatbot_Data = Chatbot_Data[:3000]

# Tokens
Q_TKN = "<usr>"
A_TKN = "<sys>"
BOS = '</s>'
EOS = '</s>'
MASK = '<unused0>'
SENT = '<unused1>'
PAD = '<pad>'

# Load tokenizer
koGPT2_TOKENIZER = PreTrainedTokenizerFast.from_pretrained("skt/kogpt2-base-v2", bos_token=BOS, eos_token=EOS, unk_token='<unk>', pad_token=PAD, mask_token=MASK)

# Define Dataset
class ChatbotDataset(Dataset):
    def __init__(self, chats, max_len=30):
        self._data = chats
        self.max_len = max_len
        self.q_token = Q_TKN
        self.a_token = A_TKN
        self.sent_token = SENT
        self.eos = EOS
        self.mask = MASK
        self.tokenizer = koGPT2_TOKENIZER

    def __len__(self):
        return len(self._data)

    def __getitem__(self, idx):
        turn = self._data.iloc[idx]
        q = turn["prompt"]
        q = re.sub(r"([.!,])", r" ", q)

        a = turn["competition"]
        a = re.sub(r"([.!,])", r" ", a)

        q_toked = self.tokenizer.tokenize(self.q_token + q + self.sent_token)
        q_len = len(q_toked)

        a_toked = self.tokenizer.tokenize(self.a_token + a + self.eos)
        a_len = len(a_toked)

        labels = [self.mask,] * q_len + a_toked[1:]
        mask = [0] * q_len + [1] * a_len + [0] * (self.max_len - q_len - a_len)

        label_ids = self.tokenizer.convert_tokens_to_ids(labels)
        while len(label_ids) < self.max_len:
            label_ids += [self.tokenizer.pad_token_id]

        token_ids = self.tokenizer.convert_tokens_to_ids(q_toked + a_toked)
        while len(token_ids) < self.max_len:
            token_ids += [self.tokenizer.pad_token_id]

        return (token_ids, np.array(mask), label_ids)

# Define collate function
def collate_batch(batch):
    max_len = 200  # This is the fixed max length we set for our dataset
    data = [item[0] for item in batch]
    mask = [item[1] for item in batch]
    label = [item[2] for item in batch]

    for i in range(len(data)):
        if len(data[i]) < max_len:
            padding_length = max_len - len(data[i])
            data[i] = data[i] + [koGPT2_TOKENIZER.pad_token_id] * padding_length
            mask[i] = list(mask[i]) + [0] * padding_length
            label[i] = label[i] + [koGPT2_TOKENIZER.pad_token_id] * padding_length

    return torch.LongTensor(data), torch.LongTensor(mask), torch.LongTensor(label)


max_lengths = []
train_set = ChatbotDataset(Chatbot_Data, max_len=200)
train_dataloader = DataLoader(train_set, batch_size=10, num_workers=0, shuffle=True, collate_fn=collate_batch)

for i in range(len(train_set)):
    token_ids, _, _ = train_set[i]
    max_lengths.append(len(token_ids))

print("Maximum length in the dataset:", max(max_lengths))

# 결과 확인
print("start")
for batch_idx, samples in enumerate(train_dataloader):
    token_ids, mask, label = samples
    print("token_ids ====> ", token_ids)
    print("mask =====> ", mask)
    print("label =====> ", label)
print("end")

In [None]:
#### 2. 데이터 모델링 및 챗봇 ####
from transformers import GPT2Tokenizer, PreTrainedTokenizerFast, GPT2LMHeadModel

# Model setup
model_kogpt = GPT2LMHeadModel.from_pretrained('skt/kogpt2-base-v2')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_kogpt.to(device)
model_kogpt.train()

# Training parameters
learning_rate = 3e-5
criterion = torch.nn.CrossEntropyLoss(reduction="none")
optimizer = torch.optim.Adam(model_kogpt.parameters(), lr=learning_rate)
epoch = 10
Sneg = -1e18

print ("start")
for epoch in range(epoch):
    for batch_idx, samples in enumerate(train_dataloader):
        optimizer.zero_grad()
        token_ids, mask, label = samples
        token_ids = token_ids.to(device)
        mask = mask.to(device)
        label = label.to(device)

        out = model_kogpt(token_ids)
        out = out.logits
        mask_3d = mask.unsqueeze(dim=2).repeat_interleave(repeats=out.shape[2], dim=2)
        mask_out = torch.where(mask_3d == 1, out, Sneg * torch.ones_like(out))
        loss = criterion(mask_out.transpose(2, 1), label)
        avg_loss = loss.sum() / mask.sum()
        avg_loss.backward()
        optimizer.step()

print("end")

# Chat with the trained model
with torch.no_grad():
    while 1:
        q = input("user > ").strip()
        if q == "quit":
            break
        a = ""
        while 1:
            input_ids = torch.LongTensor(koGPT2_TOKENIZER.encode(Q_TKN + q + SENT + A_TKN + a)).unsqueeze(dim=0)
            model_kogpt = model_kogpt.to('cpu')
            pred = model_kogpt(input_ids)
            pred = pred.logits
            gen = koGPT2_TOKENIZER.convert_ids_to_tokens(torch.argmax(pred, dim=-1).squeeze().numpy().tolist())[-1]
            if gen == EOS:
                break
            a += gen.replace("▁", " ")
        print("Chatbot > {}".format(a.strip()))

Downloading pytorch_model.bin:   0%|          | 0.00/513M [00:00<?, ?B/s]

start


In [None]:
# KoGPT 모델의 상태 딕셔너리 저장
torch.save(model_kogpt, '/content/drive/MyDrive/NLPicasso/데이터 전처리/fine_tuned_model_1.pt')

# 2. Sentence Transformer

In [None]:
!pip install -U sentence-transformers

In [None]:
from sentence_transformers import SentenceTransformer
import numpy as np

sent1 = a
sent2 = "왜 그렇게 말씀하세요...전 정말 노력했어요..."
model = SentenceTransformer('paraphrase-distilroberta-base-v1')

def cosine(sent1, sent2):
  sentences = [sent1,sent2]
  # 임베딩
  sentence_embeddings = model.encode(sentences)
  # 유사도 공식
  return np.dot(sentence_embeddings[0],sentence_embeddings[1])/(np.linalg.norm(sentence_embeddings[0])*np.linalg.norm(sentence_embeddings[1]))

result = cosine(sent1,sent2)
print("Cosine Similarity : ",result)


# 2-1. STS

In [None]:
!pip install -U sentence-transformers

In [None]:
!pip install -U sentence-transformers

In [None]:
from sentence_transformers import SentenceTransformer, models
sentences = ["This is an example sentence", "Each sentence is converted"]

embedding_model = models.Transformer(
    model_name_or_path="KDHyun08/TAACO_STS",
    max_seq_length=256,
    do_lower_case=True
)

pooling_model = models.Pooling(
    embedding_model.get_word_embedding_dimension(),
    pooling_mode_mean_tokens=True,
    pooling_mode_cls_token=False,
    pooling_mode_max_tokens=False,
)
model = SentenceTransformer(modules=[embedding_model, pooling_model])

embeddings = model.encode(sentences)
print(embeddings)


In [None]:
from sentence_transformers import SentenceTransformer, models, util
import torch

embedding_model = models.Transformer(
    model_name_or_path="KDHyun08/TAACO_STS",
    max_seq_length=256,
    do_lower_case=True
)

pooling_model = models.Pooling(
    embedding_model.get_word_embedding_dimension(),
    pooling_mode_mean_tokens=True,
    pooling_mode_cls_token=False,
    pooling_mode_max_tokens=False,
)
model = SentenceTransformer(modules=[embedding_model, pooling_model])

docs = a # gpt 답변
#각 문장의 vector값 encoding
document_embeddings = model.encode(docs)

query = '내가 언제 그랫어? 너가 잘못 기억하고 있는거야'
query_embedding = model.encode(query)

# 코사인 유사도 계산 후,
cos_scores = util.pytorch_cos_sim(query_embedding, document_embeddings)[0]

print(f"입력 문장: {query}")
print(f'gpt 답변: {a}')
print(round(cos_scores.item(),3))

# 3. BERT

In [None]:
# 일단 트랜스포머 깔아주시고
!pip install transformers

In [None]:
!pip install --upgrade tensorflow

In [None]:
# 패키지 불러와
import tensorflow as tf
import torch
# 귀찮으니깐 bert tokenizer 쓸께유
from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from keras.preprocessing.sequence import pad_sequences
# 데이터셋 학습/평가 나누기
from sklearn.model_selection import train_test_split

import pandas as pd
import numpy as np
import random
import time
import datetime

In [None]:
# 런타임 확인
n_devices = torch.cuda.device_count()
print(n_devices)

for i in range(n_devices):
    print(torch.cuda.get_device_name(i))

1
Tesla T4


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df_gas = pd.read_csv("/content/drive/MyDrive/NLPicasso/최종/Gaslighting_augmentation_data.csv")
df_chatbot = pd.read_csv("/content/drive/MyDrive/NLPicasso/최종/ChatbotData.csv")

In [None]:
df_gas['label'] = 1
df_chatbot['label']=0
df_gas.columns = ["prompt", "competition",	"label"]
df_chatbot.columns = ["prompt", "competition",	"label"]

df_all = pd.concat([df_gas, df_chatbot], axis=0, ignore_index=True)
df_all.drop(['prompt'], axis=1, inplace=True)
df_all.sample(frac=1)

Unnamed: 0,competition,label
6322,열심히 저축해서 분양받으세요.,0
1872,아직 시작한지 한달도 안됐어 너 그렇게 집중못하면 커서 거지 되는거야,1
8596,필요한 것만 사세요.,0
9084,자신이 편하고 즐거울 수 있는 방법을 생각해보세요.,0
3503,누구나 걱정은 있어요.,0
...,...,...
8595,오랜만에 쇼핑해보세요.,0
7976,무서운 소리 마세요.,0
4377,뭐라도 드세요.,0
1883,썩 맘에 들게 한 건 아닌데 봐줄만 해,1


In [None]:
train = df_all[:13000]
test = df_all[13000:]

In [None]:
sentences = ["[CLS] " + str(s) + " [SEP]" for s in train.competition]

In [None]:
labels = train['label'].values

In [None]:
import pandas as pd
from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-cased", do_lower_case=False)
result = tokenizer.tokenize('안녕하세요')
print(result)

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/996k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/29.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/625 [00:00<?, ?B/s]

['안', '##녕', '##하', '##세', '##요']


In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased', do_lower_case=False)
tokenized_texts = [tokenizer.tokenize(s) for s in sentences]

In [None]:
print(sentences[0])  #토크나이징 전
print(tokenized_texts[0]) #토크나이징 후

[CLS] 자꾸 고집 피우지마 난 그런 말 한 적이 없는데거든? [SEP]
['[CLS]', '자', '##꾸', '고', '##집', '피', '##우', '##지', '##마', '난', '그', '##런', '말', '한', '적', '##이', '없는', '##데', '##거', '##든', '?', '[SEP]']


In [None]:
MAX_LEN = 128 #최대 시퀀스 길이 설정
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

In [None]:
print(len(input_ids))
print(len(labels))


13000
13000


In [None]:
# attention mask
attention_masks = []

for seq in input_ids:
    seq_mask = [float(i>0) for i in seq]
    attention_masks.append(seq_mask)

In [None]:
# 훈련셋, 검증셋 분리
train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(input_ids,
                                                                                    labels,
                                                                                    random_state=2000,
                                                                                    test_size=0.1)

train_masks, validation_masks, _, _ = train_test_split(attention_masks,
                                                       input_ids,
                                                       random_state=2000,
                                                       test_size=0.1)

train_inputs = torch.tensor(train_inputs)
train_labels = torch.tensor(train_labels)
train_masks = torch.tensor(train_masks)
validation_inputs = torch.tensor(validation_inputs)
validation_labels = torch.tensor(validation_labels)
validation_masks = torch.tensor(validation_masks)

In [None]:
batch_size = 32

train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

In [None]:
# [CLS] + 문장 + [SEP]
sentences = ["[CLS] " + str(sentence) + " [SEP]" for sentence in test.competition]

# 라벨 데이터
labels = test['label'].values

# Word 토크나이저 토큰화
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased', do_lower_case=False)
tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]

# 시퀀스 설정 및 정수 인덱스 변환 & 패딩
MAX_LEN = 128
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]
input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

# 어텐션 마스크
attention_masks = []
for seq in input_ids:
    seq_mask = [float(i>0) for i in seq]
    attention_masks.append(seq_mask)

# 파이토치 텐서로 변환
test_inputs = torch.tensor(input_ids)
test_labels = torch.tensor(labels)
test_masks = torch.tensor(attention_masks)

# 배치 사이즈 설정 및 데이터 설정
batch_size = 32
test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = RandomSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print('No GPU available, using the CPU instead.')

There are 1 GPU(s) available.
We will use the GPU: Tesla T4


In [None]:
model_bert = BertForSequenceClassification.from_pretrained("bert-base-multilingual-cased", num_labels=2)
model_bert.cuda()

In [None]:
# 옵티마이저
optimizer = AdamW(model_bert.parameters(),
                  lr = 2e-5, # 학습률(learning rate)
                  eps = 1e-8
                )

# 에폭수
epochs = 6

# 총 훈련 스텝 : 배치반복 횟수 * 에폭
total_steps = len(train_dataloader) * epochs

# 스케줄러 생성
scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)

In [None]:
# 정확도 계산 함수
def flat_accuracy(preds, labels):

    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()

    return np.sum(pred_flat == labels_flat) / len(labels_flat)


# 시간 표시 함수
def format_time(elapsed):

    # 반올림
    elapsed_rounded = int(round((elapsed)))

    # hh:mm:ss으로 형태 변경
    return str(datetime.timedelta(seconds=elapsed_rounded))

In [None]:
#랜덤시드 고정
seed_val = 42
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

#그래디언트 초기화
model_bert.zero_grad()

# 학습
for epoch_i in range(0, epochs):

    # ========================================
    #               Training
    # ========================================

    print("")
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    print('Training...')

    # 시작 시간 설정
    t0 = time.time()

    # 로스 초기화
    total_loss = 0

    # 훈련모드로 변경
    model_bert.train()

    # 데이터로더에서 배치만큼 반복하여 가져옴
    for step, batch in enumerate(train_dataloader):
        # 경과 정보 표시
        if step % 500 == 0 and not step == 0:
            elapsed = format_time(time.time() - t0)
            print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))

        # 배치를 GPU에 넣음
        batch = tuple(t.to(device) for t in batch)

        # 배치에서 데이터 추출
        b_input_ids, b_input_mask, b_labels = batch

        # Forward 수행
        outputs = model(b_input_ids,
                        token_type_ids=None,
                        attention_mask=b_input_mask,
                        labels=b_labels)

        # 로스 구함
        loss = outputs[0]

        # 총 로스 계산
        total_loss += loss.item()

        # Backward 수행으로 그래디언트 계산
        loss.backward()

        # 그래디언트 클리핑
        torch.nn.utils.clip_grad_norm_(model_bert.parameters(), 1.0)

        # 그래디언트를 통해 가중치 파라미터 업데이트
        optimizer.step()

        # 스케줄러로 학습률 감소
        scheduler.step()

        # 그래디언트 초기화
        model_bert.zero_grad()

    # 평균 로스 계산
    avg_train_loss = total_loss / len(train_dataloader)

    print("")
    print("  Average training loss: {0:.2f}".format(avg_train_loss))
    print("  Training epcoh took: {:}".format(format_time(time.time() - t0)))

    # ========================================
    #               Validation
    # ========================================

    print("")
    print("Running Validation...")

    #시작 시간 설정
    t0 = time.time()

    # 평가모드로 변경
    model_bert.eval()

    # 변수 초기화
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0

    # 데이터로더에서 배치만큼 반복하여 가져옴
    for batch in validation_dataloader:
        # 배치를 GPU에 넣음
        batch = tuple(t.to(device) for t in batch)

        # 배치에서 데이터 추출
        b_input_ids, b_input_mask, b_labels = batch

        # 그래디언트 계산 안함
        with torch.no_grad():
            # Forward 수행
            outputs = model_bert(b_input_ids,
                            token_type_ids=None,
                            attention_mask=b_input_mask)

        # 로스 구함
        logits = outputs[0]

        # CPU로 데이터 이동
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()

        # 출력 로짓과 라벨을 비교하여 정확도 계산
        tmp_eval_accuracy = flat_accuracy(logits, label_ids)
        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

    print("  Accuracy: {0:.2f}".format(eval_accuracy/nb_eval_steps))
    print("  Validation took: {:}".format(format_time(time.time() - t0)))

print("")
print("Training complete!")

In [None]:
# 평가모드로 변경
model_bert.eval()

# 변수 초기화
eval_loss, eval_accuracy = 0, 0
nb_eval_steps, nb_eval_examples = 0, 0

# 데이터로더에서 배치만큼 반복하여 가져옴
for step, batch in enumerate(test_dataloader):
    # 경과 정보 표시
    if step % 100 == 0 and not step == 0:
        elapsed = format_time(time.time() - t0)
        print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(test_dataloader), elapsed))

    # 배치를 GPU에 넣음
    batch = tuple(t.to(device) for t in batch)

    # 배치에서 데이터 추출
    b_input_ids, b_input_mask, b_labels = batch

    # 그래디언트 계산 안함
    with torch.no_grad():
        # Forward 수행
        outputs = model_bert(b_input_ids,
                        token_type_ids=None,
                        attention_mask=b_input_mask)

    # 로스 구함
    logits = outputs[0]

    # CPU로 데이터 이동
    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    # 출력 로짓과 라벨을 비교하여 정확도 계산
    tmp_eval_accuracy = flat_accuracy(logits, label_ids)
    eval_accuracy += tmp_eval_accuracy
    nb_eval_steps += 1

print("")
print("Accuracy: {0:.2f}".format(eval_accuracy/nb_eval_steps))
print("Test took: {:}".format(format_time(time.time() - t0)))


Accuracy: 0.91
Test took: 0:00:25


In [None]:
# 입력 데이터 변환
def convert_input_data(sentences):

    # BERT의 토크나이저로 문장을 토큰으로 분리
    tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]

    # 입력 토큰의 최대 시퀀스 길이
    MAX_LEN = 128

    # 토큰을 숫자 인덱스로 변환
    input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

    # 문장을 MAX_LEN 길이에 맞게 자르고, 모자란 부분을 패딩 0으로 채움
    input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

    # 어텐션 마스크 초기화
    attention_masks = []

    # 어텐션 마스크를 패딩이 아니면 1, 패딩이면 0으로 설정
    # 패딩 부분은 BERT 모델에서 어텐션을 수행하지 않아 속도 향상
    for seq in input_ids:
        seq_mask = [float(i>0) for i in seq]
        attention_masks.append(seq_mask)

    # 데이터를 파이토치의 텐서로 변환
    inputs = torch.tensor(input_ids)
    masks = torch.tensor(attention_masks)

    return inputs, masks

In [None]:
# 문장 테스트
def test_sentences(sentences):

    # 평가모드로 변경
    model_bert.eval()

    # 문장을 입력 데이터로 변환
    inputs, masks = convert_input_data(sentences)

    # 데이터를 GPU에 넣음
    b_input_ids = inputs.to(device)
    b_input_mask = masks.to(device)

    # 그래디언트 계산 안함
    with torch.no_grad():
        # Forward 수행
        outputs = model_bert(b_input_ids,
                        token_type_ids=None,
                        attention_mask=b_input_mask)

    # 로스 구함
    logits = outputs[0]

    # CPU로 데이터 이동
    logits = logits.detach().cpu().numpy()

    return logits

In [None]:
while True:
  ipt = input()
  if ipt == 'quit':
    break
  logits = test_sentences([ipt])
  print(logits)

  if np.argmax(logits) == 1 :
    print("가스라이팅")
  elif np.argmax(logits) == 0 :
    print("일상 대화")

quit


In [None]:
# BERT 모델의 상태 딕셔너리 저장
torch.save(model, '/content/drive/MyDrive/NLPicasso/최종/bert_finetuned_gas.pth')