In [None]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import TextVectorization
import pathlib
import random
import string
import re
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
import sklearn
from sklearn.model_selection import train_test_split

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
file_path1 = '/content/drive/MyDrive/kor-eng/kor.txt'

df1 = pd.read_csv(file_path1, names = ['eng', 'kor', 'etc'], delimiter = '\t', encoding = 'UTF-8')

In [None]:
df1 = df1.drop(columns = ['etc'])

In [None]:
file_path2 = '/content/drive/MyDrive/translate/conversations.csv'

df2 = pd.read_csv(file_path2, encoding = 'UTF-8')

In [None]:
df2 = pd.concat([df2['eng_sent'], df2['kor_sent']], axis = 1)
df2.columns = ['eng', 'kor']

In [None]:
eng_kor = pd.concat([df1, df2], ignore_index = True)

eng_kor

In [None]:
eng_kor['kor'] = eng_kor['kor'].apply(lambda a : "[start] " + a + " [end]")

In [None]:
eng_kor.head()

In [None]:
vocab_size = 20000
sequence_length = 20
batch_size = 64
validation_split = 0.2
embed_dim = 256
latent_dim = 512
num_heads = 4
epochs = 15 # 에퐄을 15로 하는 이유는 30을 유지했으나 val 에서의 거의 고정된 수치와 과적합을 향하는 모습을 보여서 15로 줄임

In [None]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

print(strip_chars)

def eng_standardize(input_text):

    output_text = tf.strings.lower(input_text)

    return tf.strings.regex_replace(output_text, '[%s]' % re.escape(strip_chars), '')

def kor_standardize(input_text):

    return tf.strings.regex_replace(input_text, '[%s]' %re.escape(strip_chars), '')

eng_vector = TextVectorization(
    max_tokens = vocab_size, 
    output_mode = 'int', 
    output_sequence_length = sequence_length,
    standardize = eng_standardize)

kor_vector = TextVectorization(
    max_tokens = vocab_size,
    output_mode = 'int',
    output_sequence_length = sequence_length + 1,
    standardize = kor_standardize)

eng_vector.adapt(list(eng_kor['eng']))
kor_vector.adapt(list(eng_kor['kor']))

In [None]:
def preprocess(english, korean):

    eng = eng_vector(english)
    kor = kor_vector(korean)

    return ({"encoder_inputs": eng, "decoder_inputs": kor[:, :-1]}, kor[:, 1:])

def make_df(df, batch_size, mode):

    dataset = tf.data.Dataset.from_tensor_slices((list(df['eng']), list(df['kor'])))
    
    if mode == "train":
       dataset = dataset.shuffle(256)
       
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(preprocess)
    dataset = dataset.prefetch(16).cache()
    
    return dataset

train, test = train_test_split(eng_kor, test_size = validation_split)
train.shape, test.shape

In [None]:
X_train = make_df(train, batch_size = batch_size, mode = 'train')
X_test = make_df(test, batch_size = batch_size, mode = 'test')

for batch in X_train.take(1):
    print(batch)

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [None]:
encoder_inputs = keras.Input(shape = (None,), dtype = 'int64', name = 'encoder_inputs')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape = (None,), dtype = 'int64', name = 'decoder_inputs')
encoded_seq_inputs = keras.Input(shape = (None, embed_dim), name = 'decoder_state_inputs')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.4)(x)
decoder_outputs = layers.Dense(vocab_size, activation = 'softmax')(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
model = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name = 'transformer')

In [None]:
model.summary()

In [None]:
keras.utils.plot_model(model, show_shapes=True)

In [None]:
model.compile(
    'adam', loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])

In [None]:
model.fit(X_train, epochs = epochs, validation_data = X_test)

In [None]:
kor_vocab = kor_vector.get_vocabulary()
kor_index_lookup = dict(zip(range(len(kor_vocab)), kor_vocab))

def decode_sequence(model, input_sentence):
    
    tokenized_input_sentence = eng_vector([input_sentence])
    decoded_sentence = '[start]'
    
    for i in range(sequence_length):
        tokenized_target_sentence = kor_vector([decoded_sentence])[:, :-1]
        predictions = model([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = kor_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
            
    return decoded_sentence

In [None]:
for i in np.random.choice(len(eng_kor), 10):
    a = eng_kor.iloc[i]
    translated = decode_sequence(model, a['eng'])
    
    print("English:", a['eng'])
    print("korean:", a['kor'])
    print("Translated:", translated)

In [None]:
decode_sequence(model, 'cat is cute')

적은 데이터 양과 부족한 실력에 의해 좋은 번역 결과물을 얻을 수 없었다.
추후 말뭉치의 수를 늘려가고 자체 학습을 하여 방법을 변화 시켜가며 모델을 업데이트 혹은 새로운 모델을 만들어서 해봐야 겠다.