In [None]:
# colab 사용 시 주석 풀고 mount_path 설정 후 실행
# from google.colab import drive
# drive.mount("/content/drive")

In [None]:
#colab 사용시 주석 풀고 실행
# !pip install transformers
# vscode library 설치 안되있을 경우 실행
# %pip install transformers

In [None]:
#colab 사용시 주석 풀고 실행
# !pip install seaborn
# vscode library 설치 안되있을 경우 실행
# %pip install seaborn

In [None]:
from _init import *

from commons import file_util, string_util

import time
import torch
import random
import datetime
import numpy as np
import pandas as pd
import tensorflow as tf

from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup

from torch import nn
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.python.keras.callbacks import EarlyStopping

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# file_dir variable
WORK_DIR = "../../../"
IN_DIR = WORK_DIR + "resources/keyword_extract/"
OUT_MODEL_PATH = WORK_DIR + "resources/keyword_extract_model/bert_model.pt"
DELIM = "\t"
SETP_SIZE = 100

# Hyperparameter variable
BATCH_SIZE = 32
MAX_SEQ_LEN = 260
LEARNING_RATE = 2e-5
EPOCHS = 25
SEED_VAL = 0
DROPOUT_RATE = 0.2
DENCE_UNIT = 2

# 사용할 bert model name 활성화
# BERT_MODEL_NAME = "bert-base-uncased"
# BERT_MODEL_NAME = "bert-base-multilingual-cased"
BERT_MODEL_NAME = "klue/bert-base"
# 이거는 너무 커서 안돌아감.
# BERT_MODEL_NAME = "klue/roberta-large"

In [None]:
device_name = tf.test.gpu_device_name()

# GPU divice name checker
if device_name == "/device:GPU:0" :
    print("Found GPU at : {}".format(device_name))
else :
    print("GPU device not found")

In [None]:
# device setting
if torch.cuda.is_available() :
    device = torch.device("cuda")

    print("There are %d GPU(s) available." % torch.cuda.device_count())
    print("We will use the GPU : ", torch.cuda.get_device_name(0))
else :
    device = torch.device("cpu")
    print("No GPU available, using the CPU instead.")

In [None]:
# vs code 사용 시 데이터 불러오기
# data -> list로 넘길 때 사용
def load(in_file_path: str, encoding: str, out_list: list) :
	file_paths = file_util.get_file_paths(in_file_path, True)

	for file_path in file_paths :
		in_file = file_util.open_file(file_path, encoding, "r")

		while True :
			line = in_file.readline()

			if not line :
				break

			line = file_util.preprocess(line)
			if string_util.is_empty(line, True) :
				continue

			out_list.append(line)
	in_file.close()

# 이거 활성화 시켜서 사용하면 됨.
# raw_data = []
# load(IN_DIR, ENCODING, raw_data)
# raw_data = [line.strip().split(DELIM) for line in raw_data]

# raw_df = pd.DataFrame(raw_data, columns=["train", "label"])
# print(raw_df.shape)
# raw_df.head()

In [None]:
# file_name 추출
file_paths = file_util.get_file_paths(IN_DIR, True)

for file_path in file_paths :
    file_name = file_util.get_file_name(file_path)

In [None]:
raw_df = pd.read_csv(IN_DIR + file_name, sep=DELIM, names=["train", "label"])

raw_df.shape

In [None]:
raw_df.head()

In [None]:
# train_set, test_set 분할
# 비율 조정 해서 train 8.1, validation 0.9, test 1.0
print("여기서부터 문맥에 맞게 DATA_RATE 값 변경")
print(f"train - test : {int(raw_df.shape[0] * 0.9)}")
DATA_RATE = 11277

train_df = raw_df[:DATA_RATE]
test_df = raw_df[DATA_RATE:]

train_df.tail(), test_df.head()

In [None]:
sentence = train_df.train
sentence[:10]

In [None]:
labels = train_df.label.values
labels

In [None]:
tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME, do_lower_case=False)
tokenized_texts = [tokenizer.tokenize(sent) for sent in sentence]

sentence[0], tokenized_texts[0]

In [None]:
# token embeding
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

# sentence를 MAX_LEN 길이에 맞게 자르고, 모자란 부분을 패딩 0으로 채움
input_ids = pad_sequences(input_ids, maxlen=MAX_SEQ_LEN, dtype="int64", truncating="post", padding="post")

input_ids[0]

In [None]:
# attention masks initializer
attention_masks = []

for seq in input_ids :
    seq_mask = [float(i > 0) for i in seq]
    attention_masks.append(seq_mask)

attention_masks[0]

In [None]:
# train, validation 분리 -> train : 8, validation : 1, test : 1
train_text, val_text, train_label, val_label = train_test_split(
    input_ids,
    labels,
    random_state=0,
    test_size=0.1
)

# attention_mask train, validation 분리
train_masks, val_masks, _, _ = train_test_split(
    attention_masks,
    input_ids,
    random_state=0,
    test_size=0.1
)

# data to pytorch_tensor
train_text = torch.tensor(train_text)
train_label = torch.tensor(train_label)
train_masks = torch.tensor(train_masks)
val_text = torch.tensor(val_text)
val_label = torch.tensor(val_label)
val_masks = torch.tensor(val_masks)

print(train_text[0])
print(train_label[0])
print(train_masks[0])
print(val_text[0])
print(val_label[0])
print(val_masks[0])

In [None]:
# pytorch learning
train_data = TensorDataset(train_text, train_masks, train_label)
train_sample = RandomSampler(train_data)
train_dataloader = DataLoader(
    train_data, 
    sampler=train_sample, 
    batch_size=BATCH_SIZE
)

validation_data = TensorDataset(val_text, val_masks, val_label)
validation_sample = RandomSampler(validation_data)
validation_dataloader = DataLoader(
    validation_data, 
    sampler=validation_sample,
    batch_size=BATCH_SIZE
)

In [None]:
# test_set setting
sentence = test_df.train
sentence[:10]

In [None]:
labels = test_df.label.values
labels

In [None]:
tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME, do_lower_case=False)
tokenized_texts = [tokenizer.tokenize(sent) for sent in sentence]

# sentence[0]
tokenized_texts[0]

In [None]:
input_texts = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

input_texts = pad_sequences(input_texts, maxlen=MAX_SEQ_LEN, dtype="int64", truncating="post", padding="post")

input_texts[0]

In [None]:
attention_masks = []

for seq in input_texts :
    seq_mask = [float(i > 0) for i in seq]
    attention_masks.append(seq_mask)

attention_masks[0]

In [None]:
test_texts = torch.tensor(input_texts)
test_labels = torch.tensor(labels)
test_masks = torch.tensor(attention_masks)

print(test_texts[0])
print(test_labels[0])
print(test_masks[0])

In [None]:
test_data = TensorDataset(test_texts, test_masks, test_labels)
test_sampler = RandomSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=BATCH_SIZE)

In [None]:
NUM_LABELS = len(train_df.label.unique())

# 여기서 층마다 dropout, dence_unit, activation_function 설정가능 한지 확인
model = BertForSequenceClassification.from_pretrained(BERT_MODEL_NAME, num_labels=NUM_LABELS)
model.cuda()

In [None]:
# optimizer setting
optimizer = AdamW(
    model.parameters(),
    lr = LEARNING_RATE,     # learning_rate
    eps = 1e-8              # 0으로 나누는 것을 방지하기 위한 epsilon_value
)

# train_step : batch count * epochs
total_steps = len(train_dataloader) * EPOCHS

schdeuler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=total_steps
)

In [None]:
# flat_accuracy function
def flat_accuracy(preds, labels) :
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()

    return np.sum(pred_flat == labels_flat) / len(labels_flat)

In [None]:
# time function
def format_time(elapsed) :
    elapsed_rounded = int(round((elapsed)))

    return str(datetime.timedelta(seconds=elapsed_rounded))

In [31]:
# 재현을 위해 랜덤시드 고정
random.seed(SEED_VAL)
np.random.seed(SEED_VAL)
torch.manual_seed(SEED_VAL)
torch.cuda.manual_seed_all(SEED_VAL)

count = 0

# gradant initializer
model.zero_grad()

# early_stopping 여기서 설정

for epoch_i in range(EPOCHS) :
    '''
        Train
    '''

    print("")
    print("======================== Epoch {:} / {:} =========================".format(epoch_i + 1, EPOCHS))
    print("Training...")

    start = time.time()

    # loss initializer
    total_loss = 0

    # train mode change
    model.train()

    # dataloader from batch get data
    for step, batch in enumerate(train_dataloader) :
        if step % SETP_SIZE == 0 and not step == 0 :
            elapsed = format_time(time.time() - start)
            
            print("\tBatch {:} of {:}.\t\tElapsed: {:}.".format(step, len(train_dataloader), elapsed))

        # GPU in batch
        batch = tuple(t.to(device) for t in batch)

        # batch to data
        b_input_texts, b_input_mask, b_labels = batch

        # Forward 수행
        outputs = model(
            b_input_texts,
            token_type_ids=None,
            attention_mask=b_input_mask,
            labels=b_labels
        )

        # loss calculator
        loss = outputs[0]

        # total loss
        total_loss += loss.item()

        # BackWard
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        # weigth update
        optimizer.step()

        # 
        schdeuler.step()

        # gradent initalizer
        model.zero_grad()

    # avg loss
    avg_train_loss = total_loss / len(train_dataloader)

    print("")
    print("\tAverage training loss : {0:.2f}".format(avg_train_loss))
    print("\tTraining epcoh took : {:}".format(format_time(time.time() - start)))

    '''
        Validation
    '''

    print("")
    print("Running Validation...")

    start = time.time()

    # model 평가
    model.eval()

    # value initalizer
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0

    for batch in validation_dataloader :
        batch = tuple(t.to(device) for t in batch)

        b_input_texts, b_input_mask, b_labels = batch

        with torch.no_grad() :
            outputs = model(
                b_input_texts,
                token_type_ids=None,
                attention_mask=b_input_mask,
            )

        logits = outputs[0]

        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to("cpu").numpy()

        # output logits vs label accuracy
        tmp_eval_accuracy = flat_accuracy(logits, label_ids)
        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

    print("\tAccuracy : {0:.2f}".format(eval_accuracy / nb_eval_steps))
    print("\tValidation took : {:}".format(format_time(time.time() - start)))

print("")
print("Training complete!")

KeyboardInterrupt: 

In [None]:
# test_set evalation
start = time.time()

# eval mode
model.eval()

# variable initalizer
eval_loss, eval_accuracy = 0, 0
nb_eval_steps, nb_eval_examples = 0, 0

# test dataloader batch_size loop
for step, batch in enumerate(test_dataloader) :
    if step % SETP_SIZE == 0 and not step == 0 :
        elapsed = format_time(time.time() - start)
        
        print("\tBatch {:} of {:}.\tElapsed : {:}.".format(step, len(test_dataloader), elapsed))

    batch = tuple(t.to(device) for t in batch)

    b_input_texts, b_input_mask, b_labels = batch

    with torch.no_grad() :
        outputs = model(
            b_input_texts,
            token_type_ids=None,
            attention_mask=b_input_mask
        )

    logits = outputs[0]

    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to("cpu").numpy()

    tmp_eval_accuracy = flat_accuracy(logits, label_ids)
    eval_accuracy += tmp_eval_accuracy
    nb_eval_steps += 1

print("")
print("Accuracy : {0:.2f}".format(eval_accuracy / nb_eval_steps))
print("Test took : {:}".format(format_time(time.time() - start)))

In [None]:
# 학습 모델 저장
torch.save(model.state_dict(), OUT_MODEL_PATH)

In [None]:
# 저장한 모델 불러오는 클래스
class KeywordExtractorModel(nn.Module) :
    def __init__(self) :
        super(KeywordExtractorModel, self).__init__()
        self.layer = nn.Linear(2, 1)

    def forward(self, x) :
        x = self.layer(x)

        return x

In [None]:
# input data convert
def convert_input_data(sentence) :
    tokenized_texts = [tokenizer.tokenize(sent) for sent in sentence]

    # token embeding
    input_texts = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

    input_texts = pad_sequences(
        input_texts,
        maxlen=MAX_SEQ_LEN, 
        dtype="long", 
        truncating="post", 
        padding="post"
    )
    attention_masks = []

    for seq in input_texts :
        seq_mask = [float(i > 0) for i in seq]
        attention_masks.append(seq_mask)
    
    inputs = torch.tensor(input_texts)
    masks = torch.tensor(attention_masks)

    return inputs, masks

In [None]:
# sentence teset
def test_sentence(sentence) :
    model.eval()

    inputs, masks = convert_input_data(sentence)

    b_input_texts = inputs.to(device)
    b_input_masks = masks.to(device)

    with torch.no_grad() :
        outputs = model(
            b_input_texts,
            token_type_ids=None,
            attention_mask=b_input_masks
        )
    
    logits = outputs[0]

    logits = logits.detach().cpu().numpy()

    return logits

In [None]:
# test sentence 생성
logits = test_sentence(["연기는 별로지만 재미 하나는 끝내줌!"])

print(logits)
print(np.argmax(logits))

In [None]:
# 저장한 model 불러오기 + 위의 문장 테스트 함수 이용해서 적용되는지 확인 가능.
# device = "cuda" if torch.cuda.is_available() else "cpu"
# model = KeywordExtractorModel().to(device)

# model_state_dict = torch.load(out_dir, map_location=device)
# model.load_state_dict(model_state_dict)

# print(model)