In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
!pip install --quiet transformers==4.3.0
!pip install --quiet tokenizers==0.10.3
!pip install --quiet torchtext==0.6.0
!pip install --quiet pytorch-lightning==1.2.10
!pip install --quiet torchmetrics==0.2.0
!pip install --quiet tokenizer

[K     |████████████████████████████████| 1.8 MB 31.2 MB/s 
[K     |████████████████████████████████| 880 kB 48.1 MB/s 
[K     |████████████████████████████████| 3.3 MB 45.9 MB/s 
[?25h  Building wheel for sacremoses (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 64 kB 2.7 MB/s 
[K     |████████████████████████████████| 1.3 MB 69.5 MB/s 
[K     |████████████████████████████████| 841 kB 25.4 MB/s 
[K     |████████████████████████████████| 176 kB 59.2 MB/s 
[K     |████████████████████████████████| 829 kB 68.5 MB/s 
[?25h  Building wheel for future (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 79 kB 6.6 MB/s 
[?25h

In [None]:
# Import packages
from typing import List, Dict
import tqdm.notebook as tq
from tqdm.notebook import tqdm
import json
import pandas as pd
import numpy as np
from tokenizer import tokenizer

import torch
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from transformers import (
    AdamW,
    T5ForConditionalGeneration,
    T5TokenizerFast as T5Tokenizer
    )

In [None]:
SEP_TOKEN = ''
MASKING_CHANCE = 0.3

In [None]:
class QGDataset(Dataset):

    def __init__(
        self,
        data: pd.DataFrame,
        tokenizer: T5Tokenizer,
        source_max_token_len: int,
        target_max_token_len: int
        ):

        self.tokenizer = tokenizer
        self.data = data
        self.source_max_token_len = source_max_token_len
        self.target_max_token_len = target_max_token_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index: int):
        data_row = self.data.iloc[index]

        if np.random.rand() > MASKING_CHANCE:
            answer = data_row['answer_text']
        else:
            answer = '[MASK]'

        source_encoding = tokenizer(
            '{} {} {}'.format(answer, SEP_TOKEN, data_row['context']),
            max_length= self.source_max_token_len,
            padding='max_length',
            truncation= True,
            return_attention_mask=True,
            add_special_tokens=True,
            return_tensors='pt'
            )
    
        target_encoding = tokenizer(
            '{} {} {}'.format(data_row['answer_text'], SEP_TOKEN, data_row['question']),
            max_length=self.target_max_token_len,
            padding='max_length',
            truncation = True,
            return_attention_mask=True,
            add_special_tokens=True,
            return_tensors='pt'
            )

        labels = target_encoding['input_ids']  
        labels[labels == 0] = -100

        return dict(
            answer_text = data_row['answer_text'],
            context = data_row['context'],
            question = data_row['question'],
            input_ids = source_encoding['input_ids'].flatten(),
            attention_mask = source_encoding['attention_mask'].flatten(),
            labels=labels.flatten()
            )

In [None]:
class QGDataModule(pl.LightningDataModule):

    def __init__(
        self,
        train_df: pd.DataFrame,
        val_df: pd.DataFrame,
        test_df: pd.DataFrame,
        tokenizer: T5Tokenizer,
        batch_size,
        source_max_token_len: int,
        target_max_token_len: int
        ): 
        super().__init__()
        self.batch_size = batch_size
        self.train_df = train_df
        self.val_df = val_df
        self.test_df = test_df
        self.tokenizer = tokenizer
        self.source_max_token_len = source_max_token_len
        self.target_max_token_len = target_max_token_len

    def setup(self):
        self.train_dataset = QGDataset(self.train_df, self.tokenizer, self.source_max_token_len, self.target_max_token_len)
        self.val_dataset = QGDataset(self.val_df, self.tokenizer, self.source_max_token_len, self.target_max_token_len)
        self.test_dataset = QGDataset(self.test_df, self.tokenizer, self.source_max_token_len, self.target_max_token_len)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size = self.batch_size, shuffle=True, num_workers = 2)

    def val_dataloader(self): 
        return DataLoader(self.val_dataset, batch_size=1, num_workers=2)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=1, num_workers=2)

In [None]:
MODEL_NAME = 'paust/pko-t5-small'
SOURCE_MAX_TOKEN_LEN = 300
TARGET_MAX_TOKEN_LEN = 80

N_EPOCHS = 5
BATCH_SIZE = 4
LEARNING_RATE = 0.0001

In [None]:
tokenizer = T5Tokenizer.from_pretrained(MODEL_NAME)
print('tokenizer len before: ', len(tokenizer))
tokenizer.add_tokens(SEP_TOKEN)
print('tokenizer len after: ', len(tokenizer))
TOKENIZER_LEN = len(tokenizer)

Downloading:   0%|          | 0.00/2.92M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.79k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.95k [00:00<?, ?B/s]

tokenizer len before:  50358
tokenizer len after:  50358


In [None]:
class QGModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME, return_dict=True)
        self.model.resize_token_embeddings(TOKENIZER_LEN) #resizing after adding new tokens to the tokenizer

    def forward(self, input_ids, attention_mask, labels=None):
        output = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        return output.loss, output.logits

    def training_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        loss, output = self(input_ids, attention_mask, labels)
        self.log('train_loss', loss, prog_bar=True, logger=True)
        return loss

    def validation_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        loss, output = self(input_ids, attention_mask, labels)
        self.log('val_loss', loss, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels = batch['labels']
        loss, output = self(input_ids, attention_mask, labels)
        self.log('test_loss', loss, prog_bar=True, logger=True)
        return loss
  
    def configure_optimizers(self):
        return AdamW(self.parameters(), lr=LEARNING_RATE)

In [None]:
checkpoint_path = '/content/drive/MyDrive/BIZ&AI_PAPER_11기/세희/memento/task/model/checkpoints/best-checkpoint-v6.ckpt'

best_model = QGModel.load_from_checkpoint(checkpoint_path)
best_model.freeze()
best_model.eval()

print()

Downloading:   0%|          | 0.00/726 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/383M [00:00<?, ?B/s]




In [None]:
def generate(qgmodel: QGModel, answer: str, context: str) -> str:
    source_encoding = tokenizer(
        '{} {} {}'.format(SEP_TOKEN, answer, context),
        max_length=SOURCE_MAX_TOKEN_LEN,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        add_special_tokens=True,
        return_tensors='pt'
    )

    generated_ids = qgmodel.model.generate(
        input_ids=source_encoding['input_ids'],
        attention_mask=source_encoding['attention_mask'],
        num_beams=1,
        max_length=TARGET_MAX_TOKEN_LEN,
        repetition_penalty=1.0,
        length_penalty=1.0,
        early_stopping=True,
        use_cache=True
    )

    preds = {
        tokenizer.decode(generated_id, skip_special_tokens=True, clean_up_tokenization_spaces=True)
        for generated_id in generated_ids
    }

    return ''.join(preds)


In [None]:
def show_result(generated: str):
    print('Generated: ', generated)

test

In [None]:
context = """나 어제 늦게 집에 도착해서 세탁기 돌려야 하는 거 잊었어. 오늘은 꼭 세탁기 돌려야겠다. 나 오늘 또 늦게 도착하면 세희한테 빨래 해달라고 부탁해야겠다."""
input_answer = '[MASK]'

generated = generate(best_model, input_answer, context)

show_result(generated)

Generated:  빨래  나 오늘 늦게 도착하면 세희에게 부탁해야 하는 것은?


In [None]:
context = """난 오늘 인공지능 및 응용 과목에서 RNN에 대해서 배웠다. 너무 어려웠지만 흥미로웠다. 방학에 더 많이 공부해야겠다고 생각했다."""
input_answer = '[MASK]'

generated = generate(best_model, input_answer, context)

show_result(generated)

Generated:  RNN  난 오늘 인공지능 및 응용 과목에서 무엇에 대해 배웠는가?


In [None]:
context = """내가 오늘 기분이 좋지 않아서 엄마에게 소리지르고 화를 냈다. 너무 속상했지만 엄마한테 미안했다.
그래서 내가 무지개 마카롱을 선물해드릴거다. 그리고 미안하다고 하고 엄마를 꼭 안아줄거다"""
input_answer = '[MASK]'

generated = generate(best_model, input_answer, context)

show_result(generated)

Generated:  무지개 마카롱  엄마에게 미안하다고 하고 엄마를 위해 엄마에게 선물해 드릴 것은?


In [None]:
context = """오늘 정보시스템학과 동기들이랑 선배들이랑 후배들이랑 종강파티를 했다. 
처음 보는 후배도 만났는데 너무 재밌었다. 다같이 노래방도 갔는데 내가 좋아하는 사람이 노래를 엄청 잘불러서 깜짝 놀랬다."""
input_answer = '[MASK]'

generated = generate(best_model, input_answer, context)

show_result(generated)

Generated:  노래방  친구들과 동기들과 후배들과 종강파티를 한 장소는?


In [None]:
context = """오늘 3시 40분에 하늘이랑 점심으로 치즈돈까스랑 김치찌개를 먹었다."""

input_answer = '[MASK]'

generated = generate(best_model, input_answer, context)

show_result(generated)

Generated:  치즈돈까스랑 김치찌개  오늘 3시 40분에 하늘이랑 점심으로 먹은 음식은?


In [None]:
context = """나랑 아빠랑 8시 10분에 제일곱창으로 갔는데 사람이 너무 많아 대기시간이 너무 길어서 한시간 웨이팅하고 곱창을 먹을 수 있었다. 오랫동안 기다리게 해서 사장님이 죄송하다고 했다."""

input_answer = '[MASK]'

generated = generate(best_model, input_answer, context)

show_result(generated)

Generated:  제일곱창  나랑 아빠랑 아빠랑 8시 10분에 갔던 가게의 이름은?


In [None]:
context = """교수님이 오늘 석철이한테 과제를 내지 않았다고 혼을 내셨다. 내가봐도 교수님이 내신 과제가 너무 어려웠는데 교수님께서 다음번엔 과제를 쉽게 내주셨으면 좋겠다"""

input_answer = '[MASK]'

generated = generate(best_model, input_answer, context)

show_result(generated)

Generated:  석철이  교수님이 오늘 과제를 내지 않았다고 혼낸 사람은?
