In [1]:
!pip install transformers --quiet
!pip install sentencepiece --quiet
!pip install unidecode --quiet

[K     |████████████████████████████████| 3.1 MB 5.1 MB/s 
[K     |████████████████████████████████| 59 kB 6.7 MB/s 
[K     |████████████████████████████████| 895 kB 42.6 MB/s 
[K     |████████████████████████████████| 3.3 MB 40.5 MB/s 
[K     |████████████████████████████████| 596 kB 39.6 MB/s 
[K     |████████████████████████████████| 1.2 MB 5.1 MB/s 
[K     |████████████████████████████████| 235 kB 5.1 MB/s 
[?25h

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!ls 'drive/MyDrive/Kuliah/Semester 7/Pemrosesan Bahasa Alami/Tugas besar'

 dataset			       'Question generator Bert.ipynb'
 datauji-Question-generator.csv        'Question generator main '
 hasiluji-Question-generator-bert.csv  'Question generator T5.ipynb'
 hasiluji-Question-generator-T5.csv     Translation_Encoder2Decoder_GRU.ipynb
 model				        Translation_Encoder2Decoder_LSTM.ipynb
 ppt.pptx


In [4]:
import torch
import transformers
from transformers import T5Tokenizer, T5ForConditionalGeneration, T5Config
import spacy

In [5]:
DIR = 'drive/MyDrive/Kuliah/Semester 7/Pemrosesan Bahasa Alami/Tugas besar'

In [6]:
from tensorflow import keras
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, LSTM, Dense, Dropout
from tensorflow.keras import optimizers, metrics, backend as K
import re
import pandas as pd
import os
import unidecode
import numpy as np
from sklearn.model_selection import train_test_split

np.random.seed(2434)

SOS = '\t' # start of sequence.
EOS = '*' # end of sequence.
CHARS = list('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ ')
REMOVE_CHARS = '[#$%"\+@<=>!&,-.?:;()*\[\]^_`{|}~/\d\t\n\r\x0b\x0c]'

def truncated_acc(y_true, y_pred):
    y_true = y_true[:, :16, :]
    y_pred = y_pred[:, :16, :]
    acc = metrics.categorical_accuracy(y_true, y_pred)
    return K.mean(acc, axis=-1)
    
def truncated_loss(y_true, y_pred):
    y_true = y_true[:, :16, :]
    y_pred = y_pred[:, :16, :]
    loss = K.categorical_crossentropy(
        target=y_true, output=y_pred, from_logits=False)
    return K.mean(loss, axis=-1)
    
def seq2seq(hidden_size, nb_input_chars, nb_target_chars):
    encoder_inputs = Input(shape=(None, nb_input_chars),
                           name='encoder_data')
    encoder_lstm = LSTM(hidden_size, recurrent_dropout=0.2,
                        return_sequences=True, return_state=False,
                        name='encoder_lstm_1')
    encoder_outputs = encoder_lstm(encoder_inputs)
    
    encoder_lstm = LSTM(hidden_size, recurrent_dropout=0.2,
                        return_sequences=False, return_state=True,
                        name='encoder_lstm_2')
    encoder_outputs, state_h, state_c = encoder_lstm(encoder_outputs)
    encoder_states = [state_h, state_c]
    
    decoder_inputs = Input(shape=(None, nb_target_chars),
                           name='decoder_data')
    decoder_lstm = LSTM(hidden_size, dropout=0.2, return_sequences=True,
                        return_state=True, name='decoder_lstm')
    decoder_outputs, _, _ = decoder_lstm(decoder_inputs,
                                         initial_state=encoder_states)
    decoder_softmax = Dense(nb_target_chars, activation='softmax',
                            name='decoder_softmax')
    decoder_outputs = decoder_softmax(decoder_outputs)
    
    model = Model(inputs=[encoder_inputs, decoder_inputs],
                  outputs=decoder_outputs)
    
    adam = optimizers.Adam(lr=0.001, decay=0.0)
    model.compile(optimizer=adam, loss='categorical_crossentropy',
                  metrics=['accuracy', truncated_acc, truncated_loss])
    
    encoder_model = Model(inputs=encoder_inputs, outputs=encoder_states)
    
    decoder_state_input_h = Input(shape=(hidden_size,))
    decoder_state_input_c = Input(shape=(hidden_size,))
    decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
    decoder_outputs, state_h, state_c = decoder_lstm(
        decoder_inputs, initial_state=decoder_states_inputs)
    decoder_states = [state_h, state_c]
    decoder_outputs = decoder_softmax(decoder_outputs)
    decoder_model = Model(inputs=[decoder_inputs] + decoder_states_inputs,
                          outputs=[decoder_outputs] + decoder_states)
    
    return model, encoder_model, decoder_model
    
class CharacterTable(object):
    def __init__(self, chars):
        self.chars = sorted(set(chars))
        self.char2index = dict((c, i) for i, c in enumerate(self.chars))
        self.index2char = dict((i, c) for i, c in enumerate(self.chars))
        self.size = len(self.chars)
    
    def encode(self, C, nb_rows):
        x = np.zeros((nb_rows, len(self.chars)), dtype=np.float32)
        for i, c in enumerate(C):
            x[i, self.char2index[c]] = 1.0
        return x

    def decode(self, x, calc_argmax=True):
        if calc_argmax:
            indices = x.argmax(axis=-1)
        else:
            indices = x
        chars = ''.join(self.index2char[ind] for ind in indices)
        return indices, chars

    def sample_multinomial(self, preds, temperature=1.0):
        preds = np.reshape(preds, len(self.chars)).astype(np.float64)
        preds = np.log(preds) / temperature
        exp_preds = np.exp(preds)
        preds = exp_preds / np.sum(exp_preds)
        probs = np.random.multinomial(1, preds, 1)
        index = np.argmax(probs)
        char  = self.index2char[index]
        return index, char

def split_text(data_path, input_path):
    file_path = os.path.join(data_path, input_path)
    with open(file_path, "r") as f:
        data = f.read().split('\n')
        data = np.array(data)
        train, test = train_test_split(data,test_size=0.2)
        train_text = '\n'.join([i for i in train[1:]])
        test_text = '\n'.join([i for i in test[1:]])
        return train_text, test_text

def tokenize(text):
    tokens = [re.sub(REMOVE_CHARS, '', token)
              for token in re.split("[-\n ]", text)]
    return tokens

def add_spelling_errors(token, error_rate):
    assert(0.0 <= error_rate < 1.0)
    if len(token) < 3:
        return token
    rand = np.random.rand()
    prob = error_rate / 4.0
    if rand < prob:
        random_char_index = np.random.randint(len(token))
        token = token[:random_char_index] + np.random.choice(CHARS) \
                + token[random_char_index + 1:]
    elif prob * 2 < rand < prob * 3:
        random_char_index = np.random.randint(len(token))
        token = token[:random_char_index] + np.random.choice(CHARS) \
                + token[random_char_index:]
    elif prob < rand < prob * 2:
        random_char_index = np.random.randint(len(token))
        token = token[:random_char_index] + token[random_char_index + 1:]
    elif prob * 3 < rand < prob * 4:
        random_char_index = np.random.randint(len(token) - 1)
        token = token[:random_char_index]  + token[random_char_index + 1] \
                + token[random_char_index] + token[random_char_index + 2:]
    else:
        pass
    return token

def transform(tokens, maxlen, error_rate=0.5, shuffle=True):
    if shuffle:
        print('Shuffling data.')
        np.random.shuffle(tokens)
    encoder_tokens = []
    decoder_tokens = []
    target_tokens = []
    for token in tokens:
        encoder = add_spelling_errors(token, error_rate=error_rate)
        encoder += EOS * (maxlen - len(encoder))
        encoder_tokens.append(encoder)
    
        decoder = SOS + token
        decoder += EOS * (maxlen - len(decoder))
        decoder_tokens.append(decoder)
    
        target = decoder[1:]
        target += EOS * (maxlen - len(target))
        target_tokens.append(target)
        
        assert(len(encoder) == len(decoder) == len(target))
    return encoder_tokens, decoder_tokens, target_tokens

def batch(tokens, maxlen, ctable, batch_size=128, reverse=False):
    def generate(tokens, reverse):
        while(True):
            for token in tokens:
                if reverse:
                    token = token[::-1]
                yield token
    
    token_iterator = generate(tokens, reverse)
    data_batch = np.zeros((batch_size, maxlen, ctable.size),
                          dtype=np.float32)
    while(True):
        for i in range(batch_size):
            token = next(token_iterator)
            data_batch[i] = ctable.encode(token, maxlen)
        yield data_batch

def decode_sequences(inputs, targets, input_ctable, target_ctable,
                     maxlen, reverse, encoder_model, decoder_model,
                     nb_examples, sample_mode='argmax', random=True):
    input_tokens = []
    target_tokens = []
    if random:
        indices = np.random.randint(0, len(inputs), nb_examples)
    else:
        indices = range(nb_examples)
    for index in indices:
        input_tokens.append(inputs[index])
        target_tokens.append(targets[index])
    input_sequences = batch(input_tokens, maxlen, input_ctable,
                            nb_examples, reverse)
    input_sequences = next(input_sequences)
    states_value = encoder_model.predict(input_sequences)
    target_sequences = np.zeros((nb_examples, 1, target_ctable.size))
    target_sequences[:, 0, target_ctable.char2index[SOS]] = 1.0
    for _ in range(maxlen):
        char_probs, h, c = decoder_model.predict(
            [target_sequences] + states_value)
        target_sequences = np.zeros((nb_examples, 1, target_ctable.size))
        sampled_chars = []
        for i in range(nb_examples):
            if sample_mode == 'argmax':
                next_index, next_char = target_ctable.decode(
                    char_probs[i], calc_argmax=True)
            elif sample_mode == 'multinomial':
                next_index, next_char = target_ctable.sample_multinomial(
                    char_probs[i], temperature=0.5)
            else:
                raise Exception(
                    "`sample_mode` accepts `argmax` or `multinomial`.")
            sampled_chars.append(next_char) 
            # Update target sequence with index of next character.
            target_sequences[i, 0, next_index] = 1.0

        stop_char = set(sampled_chars)
        if len(stop_char) == 1 and stop_char.pop() == EOS:
            break
        states_value = [h, c]
    target_tokens  = [re.sub('[%s]' % EOS, '', token)
                      for token in target_tokens]
    return target_tokens

def restore_model(model_path, hidden_size):
    model = load_model(model_path, custom_objects={
        'truncated_acc': truncated_acc, 'truncated_loss': truncated_loss})
    
    encoder_inputs = model.input[0] # encoder_data
    encoder_lstm1 = model.get_layer('encoder_lstm_1')
    encoder_lstm2 = model.get_layer('encoder_lstm_2')
    
    encoder_outputs = encoder_lstm1(encoder_inputs)
    _, state_h, state_c = encoder_lstm2(encoder_outputs)
    encoder_states = [state_h, state_c]
    encoder_model = Model(inputs=encoder_inputs, outputs=encoder_states)

    decoder_inputs = model.input[1]
    decoder_state_input_h = Input(shape=(hidden_size,))
    decoder_state_input_c = Input(shape=(hidden_size,))
    decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
    decoder_lstm = model.get_layer('decoder_lstm')
    decoder_outputs, state_h, state_c = decoder_lstm(
        decoder_inputs, initial_state=decoder_states_inputs)
    decoder_states = [state_h, state_c]
    decoder_softmax = model.get_layer('decoder_softmax')
    decoder_outputs = decoder_softmax(decoder_outputs)
    decoder_model = Model(inputs=[decoder_inputs] + decoder_states_inputs,
                          outputs=[decoder_outputs] + decoder_states)
    return encoder_model, decoder_model

def spell_correction(test_text):
    text,_ = split_text(f'{DIR}/dataset/', 'movie_lines_preprocess.txt')
    vocab = tokenize(text)
    vocab = list(filter(None, set(vocab)))
    maxlen = max([len(token) for token in vocab]) + 2
    train_encoder, train_decoder, train_target = transform(
        vocab, maxlen, error_rate=0.6, shuffle=False)

    tokens = tokenize(test_text)
    tokens = list(filter(None, tokens))
    nb_tokens = len(tokens)
    final_tokens, _, target_tokens = transform(
        tokens, maxlen, error_rate=0, shuffle=False)

    input_chars = set(' '.join(train_encoder))
    target_chars = set(' '.join(train_decoder))
    input_ctable = CharacterTable(input_chars)
    target_ctable = CharacterTable(target_chars)

    encoder_model, decoder_model = restore_model(f'{DIR}/model/seq2seq_spellcorrection.h5', 512)

    target_tokens = decode_sequences(
        final_tokens, target_tokens, input_ctable, target_ctable,
        maxlen, True, encoder_model, decoder_model, nb_tokens,
        sample_mode='argmax', random=False)
    return ' '.join([token for token in target_tokens])

In [7]:
class QuestionGenerator:
  def __init__(self, pretrained_path=f'{DIR}/model/qg_pretrained_t5_model_trained.pth'):
    self.SEQ_LENGTH = 512
    self.PRETRAINED_MODEL = 't5-base'
    self.nlp = nlp = spacy.load("en_core_web_sm")

    # create tokenizer
    self.tokenizer = T5Tokenizer.from_pretrained(self.PRETRAINED_MODEL)
    self.tokenizer.add_special_tokens(
      {'additional_special_tokens': ['<answer>', '<context>']}
    )

    # check if cuda available
    self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # load model
    saved = torch.load(pretrained_path, map_location=self.device)
    config = T5Config(decoder_start_token_id=self.tokenizer.pad_token_id)
    self.model = T5ForConditionalGeneration(config).from_pretrained(self.PRETRAINED_MODEL)
    self.model.resize_token_embeddings(len(self.tokenizer))
    self.model.load_state_dict(saved['model_state_dict'])

  def process_text(self, text):    
    # name entity recognition to collect words or sentences that can be used as answer
    answers = self.nlp(text).ents

    # input of question answer model will have format
    # <answer> answer <context> context
    input_texts = []
    input_answers = []
    for answer in answers:
      input_answers.append(answer)
      input_texts.append('<answer> %s <context> %s' % (answer, text))
    
    return input_texts, input_answers
  
  def generate(self, text):
    input_texts, answers = self.process_text(text)

    # encoder and decoder
    input_sequences = self.tokenizer(
        input_texts, 
        padding='max_length', 
        max_length=self.SEQ_LENGTH,
        truncation=True,
        return_tensors="pt"
    )
    output_sequences = self.model.generate(
        input_ids=input_sequences['input_ids'],
        attention_mask=input_sequences['attention_mask']
    )
    questions = self.tokenizer.batch_decode(output_sequences, skip_special_tokens=True)

    # return answers and questions
    return answers, questions

In [8]:
# Pre-Trained Model of Machine Translation

from transformers import AutoModelWithLMHead, AutoTokenizer
translation_model = AutoModelWithLMHead.from_pretrained("Helsinki-NLP/opus-mt-en-id")
translation_tokenizer = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-id")



Downloading:   0%|          | 0.00/1.11k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/278M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/42.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/777k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/782k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.20M [00:00<?, ?B/s]

# Main

In [9]:
qg = QuestionGenerator()

Downloading:   0%|          | 0.00/773k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.32M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.17k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/850M [00:00<?, ?B/s]

In [10]:
text = """As the acting Grand Master for the Knights, Jean takes all of her responsibilities and duties associated with the role seriously, regardless of how minor the tasks may seem, such as finding a lost cat. Due to this, Jean often exhausts herself trying to complete commissions from the locals, much to the concern of her fellow members.

Her devotion to her duties stems from two reasons; her upbringing as a child, and Varka's teachings. Even though he consistently takes his duties lightly, his relaxed and unruly personality has contributed to her growth.[2] She shows no resentment towards his attitude, instead vowing to ensure that the city will be more prosperous and welcoming when he returns. Her work ethic makes her well-liked by both Mondstadt's citizens, her fellow members, and is noted by other organizations. Although she prefers using peaceful methods to solve problems, she will not hesitate to use force if necessary.

She sees Vennessa as a role model because of her exploits and how she left an impressive legacy behind and works tirelessly to maintain her legacy and Mondstadt's safety. Whenever she feels troubled or confused, she often heads to the Great Tree in Windrise.[3]

She has a habit of calling Diluc her senior, even well after he left the Knights. Despite this, she acknowledges that they both share the same vision of protecting Mondstadt."""

In [11]:
text_fix = spell_correction(text)

In [12]:
text_fix

"As the acting Grand Master for the Knights Jean takes all of her responsibilities and duties associated with the role seriously regardless of how minor the tasks may seem such as finding a lost cat Due to this Jean often exhausts herself trying to complete commissions from the locals much to the concern of her fellow members Her devotion to her duties stems from two reasons her upbringing as a child and Varka's teachings Even though he consistently takes his duties lightly his relaxed and unruly personality has contributed to her growth She shows no resentment towards his attitude instead vowing to ensure that the city will be more prosperous and welcoming when he returns Her work ethic makes her well liked by both Mondstadt's citizens her fellow members and is noted by other organizations Although she prefers using peaceful methods to solve problems she will not hesitate to use force if necessary She sees Vennessa as a role model because of her exploits and how she left an impressive

In [13]:
answers, questions = qg.generate(text_fix.lower())

In [14]:
for answer, question in zip(answers, questions):
  print(f'{question} ({answer})')

What is the role of mondstadt's acting grand master? (jean)
jean mccartney is the acting grand master for the knights despite (two)


In [29]:
indo_answers = []
indo_questions = []

for answer, question in zip(answers, questions):
  # translate answer
  inputs = translation_tokenizer.encode(str(answer), return_tensors="pt")
  outputs = translation_model.generate(inputs, max_length=128, num_beams=4, early_stopping=True)
  indo_answers.append(translation_tokenizer.decode(outputs[0])[6:])

  # translate question
  inputs = translation_tokenizer.encode(question, return_tensors="pt")
  outputs = translation_model.generate(inputs, max_length=128, num_beams=4, early_stopping=True)
  indo_questions.append(translation_tokenizer.decode(outputs[0])[6:])

In [30]:
for answer, question in zip(indo_answers, indo_questions):
  print(f'{question} ({answer})')

Apa peran mondstadt's bertindak grand master? (jean)
Jean mcartney adalah bertindak master besar untuk ksatria meskipun (dua)
