In [57]:
import tensorflow as tf
from tensorflow.keras.layers import Embedding,GRU,Dense,AdditiveAttention,TextVectorization
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import Adam

import csv

import typing
from typing import Any, Tuple

In [58]:
from google.colab import drive
drive.mount('/content/drive')

with open("/content/drive/MyDrive/Translate/data/eng-ind.csv") as f:
  csv_reader = csv.reader(f)
  next(csv_reader)

  input_data = list()
  target_data = list()

  for row in csv_reader:
    input_data.append(row[0])
    target_data.append(row[1])

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [59]:
BUFFER_SIZE = len(input_data)
BATCH_SIZE = 64

dataset = tf.data.Dataset.from_tensor_slices((input_data, target_data)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)

In [60]:
def tf_lower_and_split_punct(text):
  text = tf.strings.lower(text)
  text = tf.strings.regex_replace(text, '[^ a-z.?!,¿]', '')
  text = tf.strings.regex_replace(text, '[.?!,¿]', r' \0 ')
  text = tf.strings.strip(text)

  text = tf.strings.join(['[START]', text, '[END]'], separator=' ')
  return text

In [61]:
max_vocab_size = 5000

input_text_processor = TextVectorization(
    standardize=tf_lower_and_split_punct,
    max_tokens=max_vocab_size)

output_text_processor = TextVectorization(
    standardize=tf_lower_and_split_punct,
    max_tokens=max_vocab_size)

input_text_processor.adapt(input_data)
output_text_processor.adapt(target_data)

In [62]:
embedding_dim = 256
units = 1024

In [63]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, input_vocab_size, embedding_dim, enc_units):
    super(Encoder, self).__init__()
    self.enc_units = enc_units
    self.input_vocab_size = input_vocab_size
    self.embedding = Embedding(self.input_vocab_size,
                              embedding_dim)
    self.gru = GRU(self.enc_units,
                  return_sequences=True,
                  return_state=True,
                  recurrent_initializer='glorot_uniform')

  def call(self, tokens, state=None):
    vectors = self.embedding(tokens)
    output, state = self.gru(vectors, initial_state=state)
    return output, state

In [64]:
class BahdanauAttention(tf.keras.layers.Layer):
  def __init__(self, units):
    super().__init__()
    self.W1 = Dense(units, use_bias=False)
    self.W2 = Dense(units, use_bias=False)
    self.attention = AdditiveAttention()

  def call(self, query, value, mask):
    w1_query = self.W1(query)
    w2_key = self.W2(value)

    query_mask = tf.ones(tf.shape(query)[:-1], dtype=bool)
    value_mask = mask

    context_vector, attention_weights = self.attention(
        inputs = [w1_query, value, w2_key],
        mask=[query_mask, value_mask],
        return_attention_scores = True,
    )

    return context_vector, attention_weights

In [65]:
class DecoderInput(typing.NamedTuple):
  new_tokens: Any
  enc_output: Any
  mask: Any

class DecoderOutput(typing.NamedTuple):
  logits: Any
  attention_weights: Any

In [66]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, output_vocab_size, embedding_dim, dec_units):
    super(Decoder, self).__init__()
    self.dec_units = dec_units
    self.output_vocab_size = output_vocab_size
    self.embedding_dim = embedding_dim

    self.embedding = Embedding(self.output_vocab_size,
                              embedding_dim)
    self.gru = GRU(self.dec_units,
                  return_sequences=True,
                  return_state=True,
                  recurrent_initializer='glorot_uniform')

    self.attention = BahdanauAttention(self.dec_units)

    self.Wc = Dense(dec_units, activation=tf.math.tanh,
                    use_bias=False)

    self.fc = Dense(self.output_vocab_size)
  
  def call(self,
          inputs: DecoderInput,
          state=None) -> Tuple[DecoderOutput, tf.Tensor]:

    vectors = self.embedding(inputs.new_tokens)
    rnn_output, state = self.gru(vectors, initial_state=state)

    context_vector, attention_weights = self.attention(
        query=rnn_output, value=inputs.enc_output, mask=inputs.mask)

    context_and_rnn_output = tf.concat([context_vector, rnn_output], axis=-1)

    attention_vector = self.Wc(context_and_rnn_output)

    logits = self.fc(attention_vector)

    return DecoderOutput(logits, attention_weights), state
  


In [67]:
class TrainTranslator(tf.keras.Model):
  def __init__(self, embedding_dim, units,
               input_text_processor,
               output_text_processor, 
               use_tf_function=True):
    super().__init__()
    encoder = Encoder(input_text_processor.vocabulary_size(),
                      embedding_dim, units)
    decoder = Decoder(output_text_processor.vocabulary_size(),
                      embedding_dim, units)

    self.encoder = encoder
    self.decoder = decoder
    self.input_text_processor = input_text_processor
    self.output_text_processor = output_text_processor
    self.use_tf_function = use_tf_function

  def train_step(self, inputs):
    if self.use_tf_function:
      return self._tf_train_step(inputs)
    else:
      return self._train_step(inputs)
  
  def _preprocess(self, input_text, target_text):

    input_tokens = self.input_text_processor(input_text)
    target_tokens = self.output_text_processor(target_text)

    input_mask = input_tokens != 0

    target_mask = target_tokens != 0

    return input_tokens, input_mask, target_tokens, target_mask

  def _train_step(self, inputs):
    input_text, target_text = inputs  

    (input_tokens, input_mask,
    target_tokens, target_mask) = self._preprocess(input_text, target_text)

    max_target_length = tf.shape(target_tokens)[1]

    with tf.GradientTape() as tape:
      enc_output, enc_state = self.encoder(input_tokens)

      dec_state = enc_state
      loss = tf.constant(0.0)

      for t in tf.range(max_target_length-1):
        new_tokens = target_tokens[:, t:t+2]
        step_loss, dec_state = self._loop_step(new_tokens, input_mask,
                                              enc_output, dec_state)
        loss = loss + step_loss

      average_loss = loss / tf.reduce_sum(tf.cast(target_mask, tf.float32))

    variables = self.trainable_variables 
    gradients = tape.gradient(average_loss, variables)
    self.optimizer.apply_gradients(zip(gradients, variables))

    return {'batch_loss': average_loss}

  def _loop_step(self, new_tokens, input_mask, enc_output, dec_state):
    input_token, target_token = new_tokens[:, 0:1], new_tokens[:, 1:2]

    decoder_input = DecoderInput(new_tokens=input_token,
                                enc_output=enc_output,
                                mask=input_mask)

    dec_result, dec_state = self.decoder(decoder_input, state=dec_state)

    y = target_token
    y_pred = dec_result.logits
    step_loss = self.loss(y, y_pred)

    return step_loss, dec_state
  
  @tf.function(input_signature=[[tf.TensorSpec(dtype=tf.string, shape=[None]),
                               tf.TensorSpec(dtype=tf.string, shape=[None])]])
  def _tf_train_step(self, inputs):
    return self._train_step(inputs)

In [68]:
class MaskedLoss(tf.keras.losses.Loss):
  def __init__(self):
    self.name = 'masked_loss'
    self.loss = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction='none')

  def __call__(self, y_true, y_pred):
    loss = self.loss(y_true, y_pred)
    mask = tf.cast(y_true != 0, tf.float32)
    loss *= mask
    return tf.reduce_sum(loss)

In [69]:
class BatchLoss(tf.keras.callbacks.Callback):
  def __init__(self, key):
    self.key = key
    self.logs = []

  def on_train_batch_end(self, n, logs):
    self.logs.append(logs[self.key])

batch_loss = BatchLoss('batch_loss')

In [70]:
model = TrainTranslator(
    embedding_dim, units,
    input_text_processor=input_text_processor,
    output_text_processor=output_text_processor)

model.compile(
    optimizer=Adam(),
    loss=MaskedLoss(),
)

In [71]:
model.fit(dataset, epochs=10,callbacks=[batch_loss])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7ff16176ba10>

In [72]:
import numpy as np

In [73]:
class Translator(tf.Module):

  def __init__(self, encoder, decoder, input_text_processor,
               output_text_processor):
    self.encoder = encoder
    self.decoder = decoder
    self.input_text_processor = input_text_processor
    self.output_text_processor = output_text_processor

    self.output_token_string_from_index = (
        tf.keras.layers.StringLookup(
            vocabulary=output_text_processor.get_vocabulary(),
            mask_token='',
            invert=True))

    index_from_string = tf.keras.layers.StringLookup(
        vocabulary=output_text_processor.get_vocabulary(), mask_token='')
    token_mask_ids = index_from_string(['', '[UNK]', '[START]']).numpy()

    token_mask = np.zeros([index_from_string.vocabulary_size()], dtype=np.bool)
    token_mask[np.array(token_mask_ids)] = True
    self.token_mask = token_mask

    self.start_token = index_from_string(tf.constant('[START]'))
    self.end_token = index_from_string(tf.constant('[END]'))
  
  def tokens_to_text(self, result_tokens):
    result_text_tokens = self.output_token_string_from_index(result_tokens)

    result_text = tf.strings.reduce_join(result_text_tokens,
                                        axis=1, separator=' ')

    result_text = tf.strings.strip(result_text)
    return result_text

  def sample(self, logits, temperature):
    token_mask = self.token_mask[tf.newaxis, tf.newaxis, :]

    logits = tf.where(self.token_mask, -np.inf, logits)

    if temperature == 0.0:
      new_tokens = tf.argmax(logits, axis=-1)
    else: 
      logits = tf.squeeze(logits, axis=1)
      new_tokens = tf.random.categorical(logits/temperature,
                                          num_samples=1)

    return new_tokens
  
  def translate(self,
                input_text, *,
                max_length=50,
                return_attention=True,
                temperature=1.0):
    batch_size = tf.shape(input_text)[0]
    input_tokens = self.input_text_processor(input_text)
    enc_output, enc_state = self.encoder(input_tokens)

    dec_state = enc_state
    new_tokens = tf.fill([batch_size, 1], self.start_token)

    result_tokens = []
    attention = []
    done = tf.zeros([batch_size, 1], dtype=tf.bool)

    for _ in range(max_length):
      dec_input = DecoderInput(new_tokens=new_tokens,
                              enc_output=enc_output,
                              mask=(input_tokens!=0))

      dec_result, dec_state = self.decoder(dec_input, state=dec_state)

      attention.append(dec_result.attention_weights)

      new_tokens = self.sample(dec_result.logits, temperature)

      done = done | (new_tokens == self.end_token)

      new_tokens = tf.where(done, tf.constant(0, dtype=tf.int64), new_tokens)

      result_tokens.append(new_tokens)

      if tf.executing_eagerly() and tf.reduce_all(done):
        break

    result_tokens = tf.concat(result_tokens, axis=-1)
    result_text = self.tokens_to_text(result_tokens)

    if return_attention:
      attention_stack = tf.concat(attention, axis=1)
      return {'text': result_text, 'attention': attention_stack}
    else:
      return {'text': result_text}
  
  @tf.function(input_signature=[tf.TensorSpec(dtype=tf.string, shape=[None])])
  def tf_translate(self, input_text):
    return self.translate(input_text)

In [74]:
translator = Translator(
    encoder=model.encoder,
    decoder=model.decoder,
    input_text_processor=input_text_processor,
    output_text_processor=output_text_processor,
)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations


In [85]:
input_text = tf.constant([
    'what dou you mean?',
    'This is translate feature', 
])

result = translator.tf_translate(
    input_text = input_text)


for i in result['text']:
  print(i.numpy().decode())

apa yang kamu maksudkan ?
ini adalah kamarku untuk diangkat .


In [75]:
tf.saved_model.save(translator, '/content/drive/MyDrive/Translate/saved_model',
                    signatures={'serving_default': translator.tf_translate})



INFO:tensorflow:Assets written to: /content/drive/MyDrive/Translate/saved_model/assets


INFO:tensorflow:Assets written to: /content/drive/MyDrive/Translate/saved_model/assets
