In [1]:
import models
from loader import pipeline
import tensorflow as tf
from tensorflow.keras.layers import TextVectorization

In [2]:
data_path = "dataset\set_2\dialogs.txt"
train = pipeline.create_dataset(data_path)
vectorizer = TextVectorization(max_tokens=5000,standardize=pipeline.add_start_and_end_tokens)
vectorizer.adapt(train.map(lambda x: x["question"]))

In [5]:
vocab = vectorizer.get_vocabulary()

In [9]:
num_layers = 1
model_dim = 64
dff = 256
num_heads = 4
dropout_rate = 0.2
vocab_len = len(vocab)

In [7]:
from models.transformer.transformer import Transformer

In [11]:
MAX_TOKENS = 128
transformer = Transformer(
    num_layers=num_layers,
    model_dim=model_dim,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=vocab_len,
    target_vocab_size=vocab_len,
    dropout_rate=dropout_rate)

In [12]:
from models.custom_metrics.metrics import loss_function, accuracy_function

In [13]:

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.Mean(name='train_accuracy')

In [15]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)


learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

In [14]:
EPOCHS = 50

# The @tf.function trace-compiles train_step into a TF graph for faster
# execution. The function specializes to the precise shape of the argument
# tensors. To avoid re-tracing due to the variable sequence lengths or variable
# batch sizes (the last batch is smaller), use input_signature to specify
# more generic shapes.

train_step_signature = [
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
]


@tf.function(input_signature=train_step_signature)
def train_step(inp, tar):
  tar_inp = tar[:, :-1]
  tar_real = tar[:, 1:]

  with tf.GradientTape() as tape:
    predictions, _ = transformer([inp, tar_inp],
                                 training = True)
    loss = loss_function(tar_real, predictions)

  gradients = tape.gradient(loss, transformer.trainable_variables)
  optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

  train_loss(loss)
  train_accuracy(accuracy_function(tar_real, predictions))

In [None]:
for epoch in range(EPOCHS):


  train_loss.reset_states()
  train_accuracy.reset_states()

  for batch, inputs in enumerate(train):
    train_step(vectorizer(inputs["question"]), vectorizer(inputs["answer"]))

    if batch % 50 == 0:
      print(f'Epoch {epoch + 1} Batch {batch} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')


  print(f'Epoch {epoch + 1} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')
