<a href="https://colab.research.google.com/github/masies/CRA/blob/main/Replication_package_PreTraining.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# T5 Pre-Training

in this notebook we will pre-train a T5 small model on the dataset we already processed.

We start by setting the environment. connecting colab to the GCS bucket and setting everything up for the TPU processor. (This colab uses TPU and high ram settings)

In [1]:
from google.colab import auth
auth.authenticate_user()
#@title ## Set Your GCS credential
project_id = 'helical-loop-303918'#@param {type:"string"}
bucket_name = 'code_review_automation'#@param {type:"string"}

!gcloud config set project {project_id}

!gsutil cp gs://{bucket_name}/replication_package/requirements/requirements_preTraining.txt  requirements_preTraining.txt

!pip install -r /content/requirements_preTraining.txt

import functools
import os
import time
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

import tensorflow.compat.v1 as tf
tf.enable_eager_execution()

import tensorflow_datasets as tfds

import t5

BASE_DIR = "gs://"+bucket_name

if not BASE_DIR or BASE_DIR == "gs://":
  raise ValueError("You must enter a BASE_DIR.")
ON_CLOUD = True

if ON_CLOUD:
  import tensorflow_gcs_config
  from google.colab import auth
  # Set credentials for GCS reading/writing from Colab and TPU.
  TPU_TOPOLOGY = "2x2"
  try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
    TPU_ADDRESS = tpu.get_master()
    print('Running on TPU:', TPU_ADDRESS)
  except ValueError:
    raise BaseException('ERROR: Not connected to a TPU runtime; please see the previous cell in this notebook for instructions!')
  auth.authenticate_user()
  tf.config.experimental_connect_to_host(TPU_ADDRESS)
  tensorflow_gcs_config.configure_gcs_from_colab_auth()

tf.disable_v2_behavior()

# Improve logging.
from contextlib import contextmanager
import logging as py_logging

if ON_CLOUD:
  tf.get_logger().propagate = False
  py_logging.root.setLevel('INFO')

@contextmanager
def tf_verbosity_level(level):
  og_level = tf.logging.get_verbosity()
  tf.logging.set_verbosity(level)
  yield
  tf.logging.set_verbosity(og_level)

Updated property [core/project].


To take a quick anonymous survey, run:
  $ gcloud survey

Copying gs://code_review_automation/replication_package/requirements/requirements_preTraining.txt...
/ [1 files][  7.1 KiB/  7.1 KiB]                                                
Operation completed over 1 objects/7.1 KiB.                                      
Collecting huggingface-hub==0.0.8
  Downloading https://files.pythonhosted.org/packages/a1/88/7b1e45720ecf59c6c6737ff332f41c955963090a18e72acbcbeac6b25e86/huggingface_hub-0.0.8-py3-none-any.whl
Collecting mesh-tensorflow==0.1.19
[?25l  Downloading https://files.pythonhosted.org/packages/ce/10/37df0bc87ebf84e1414613176340e3aadc3697d2bd112bf63d3d4b1e848a/mesh_tensorflow-0.1.19-py3-none-any.whl (366kB)
[K     |████████████████████████████████| 368kB 8.8MB/s 
Collecting portalocker==2.0.0
  Downloading https://files.pythonhosted.org/packages/89/a6/3814b7107e0788040870e8825eebf214d72166adf656ba7d4bf14759a06a/portalocker-2.0.0-py2.py3-none

Instructions for updating:
non-resource variables are not supported in the long term


We specify the path of our masked pre-training dataset (the tsv file) in the GCS bucket 

In [2]:
# set the path in your GCS bucket where you store the masked pretraining datasets (tsv format)
masked_pretraining_dataset_path = "gs://" + bucket_name + "/replication_package/dataset/pre-training/pre-training.tsv"

nq_tsv_path = {
    "train": masked_pretraining_dataset_path
}

We specify the model and vocab path of the previusly trained sentencepiece model in the GCS bucket

In [3]:
from t5.data import postprocessors as t5_postprocessors
from t5.seqio import Feature,SentencePieceVocabulary

# # Set the path of sentencepiece model and vocab files
vocab_model_path = "gs://" + bucket_name + "/replication_package/code_review_model/TestModel.model"
vocab_path = "gs://" + bucket_name + "replication_package/code_review_model/TestModel.vocab"

TaskRegistry = t5.data.TaskRegistry
TfdsTask = t5.data.TfdsTask

def get_default_vocabulary():
  return SentencePieceVocabulary(vocab_model_path, 100)

We scan our dataset to generate input/output pairs

In [4]:
DEFAULT_OUTPUT_FEATURES = {
    "inputs": Feature(
        vocabulary=get_default_vocabulary(), add_eos=False, required=True),

    "targets": Feature(
        vocabulary=get_default_vocabulary(), add_eos=False)
}

def nq_dataset_fn(split, shuffle_files=True):
  # We only have one file for each split.
  del shuffle_files

  # Load lines from the text file as examples.
  ds = tf.data.TextLineDataset(nq_tsv_path[split])
  ds = ds.map(
      functools.partial(tf.io.decode_csv, record_defaults=["string","string"],
                        field_delim="\t", use_quote_delim=False),
      num_parallel_calls=tf.data.experimental.AUTOTUNE)
  ds = ds.map(lambda *ex: dict(zip(["input", "output"], ex)))
  return ds

# print("A few raw train examples...")
for ex in tfds.as_numpy(nq_dataset_fn("train").take(3)):
  print(ex)

{'input': b'<extra_id_0> Bind indexed elements to the<extra_id_1> collection<extra_id_2> param name the name of the property to bind @param target the target bind<extra_id_3> @param elementBinder the binder<extra_id_4> use for elements @param<extra_id_5> Type<extra_id_6> aggregate type, may be a collection or<extra_id_7> array @param elementType the element type @param<extra_id_8> the destination for results </technical_language><code> protected<extra_id_9> void bindIndexed(ConfigurationPropertyName name, Bindable<?><extra_id_10> , AggregateElementBinder<extra_id_11> , ResolvableType aggregateType, ResolvableType elementType, Indexed<extra_id_12> result) { for (Configuration<extra_id_13> source : getContext().getSources()) {<extra_id_14> Indexed<extra_id_15> source, name, target<extra_id_16> Binder,<extra_id_17> , aggregateType,<extra_id_18> Type); if (result.wasSuppl<extra_id_19> () && result.get() != null) { return; } } } </code></s>', 'output': b'<extra_id_0> <technical_language><ex

We create a task for training T5, here we specify the input & output maximum sizes : 512 tokens.

In [5]:
def preprocessing(ds):
  def to_inputs_and_targets(ex):
        inputs = tf.strings.join([ ex['input']], separator=' ')
        class_label = tf.strings.join([ex['output']], separator=' ')
        return {'inputs': inputs, 'targets': class_label }
  return ds.map(to_inputs_and_targets, num_parallel_calls=tf.data.experimental.AUTOTUNE)

#Create a new training task
t5.data.TaskRegistry.remove('pretraining')
t5.data.TaskRegistry.add(
    "pretraining",
    t5.data.Task,
    dataset_fn=nq_dataset_fn,
    splits=["train", "validation"],
    text_preprocessor=[preprocessing],
    output_features = DEFAULT_OUTPUT_FEATURES,
    metric_fns=[t5.evaluation.metrics.accuracy],
)

nq_task = t5.data.TaskRegistry.get("pretraining")
ds = nq_task.get_dataset(split="train", sequence_length={"inputs": 512, "targets": 512})
print("A  preprocessed training example...")
for ex in tfds.as_numpy(ds.take(1)):
  print(ex)

  _tokenize, num_parallel_calls=tf.data.experimental.AUTOTUNE)


A  preprocessed training example...
{'inputs_pretokenized': b'<technical_language> Creates a mutable {@code HashSet} instance containing the given<extra_id_0> . A<extra_id_1> thin convenience for creating an empty set and then calling {@link Iterators#addAll}.<extra_id_2> : if mutability is not required and the elements are non-null, use<extra_id_3> copyOf(Iterator)} instead. Note: if {@code E} is an<extra_id_4> link<extra_id_5> } type, you should create an<extra_id_6> link EnumSet} instead. Overall,<extra_id_7> method is not very<extra_id_8> and<extra_id_9> likely be deprecated<extra_id_10> the<extra_id_11> . </technical_language><extra_id_12> public static <E> HashSet<<extra_id_13> > <extra_id_14> (Iterator<? extends E> elements) { HashSet<<extra_id_15> > set = newHashSet(); Iterators<extra_id_16> (set, elements);<extra_id_17> set; } </code></s>', 'inputs': array([    7,     5,  1441,    17,  4577,   106,   219,  1635,    96,
         209,  1104,    10,   226,    40,  8797,    25,   

We set up the model size (small), the model batch size (256, unable to replicate 512), and the path to save the checkpoints.
checkpoints will be saved each 5000 steps.

In [6]:
from mesh_tensorflow.transformer.learning_rate_schedules import learning_rate_schedule_noam

MODEL_SIZE = "small"  

# # Set the output path of for saving models checkpoints
MODEL_DIR = "gs://" + bucket_name + "/replication_package/model_dumps"

model_parallelism, train_batch_size, keep_checkpoint_max = {
    "small": (1, 256, 16),
    "base": (2, 128, 8),
    "large": (8, 64, 4),
    "3B": (8, 16, 1),
    "11B": (8, 16, 1)}[MODEL_SIZE]

tf.io.gfile.makedirs(MODEL_DIR)

model = t5.models.MtfModel(
    model_dir=MODEL_DIR,
    tpu=TPU_ADDRESS,
    tpu_topology=TPU_TOPOLOGY,
    model_parallelism=model_parallelism,
    batch_size=train_batch_size,
    sequence_length={"inputs": 512, "targets": 512},
    learning_rate_schedule = learning_rate_schedule_noam,
    save_checkpoints_steps=5000,
    keep_checkpoint_max=keep_checkpoint_max if ON_CLOUD else None
)

Finally, we run the actual training procedure setting 200000 as our Train step total count

In [7]:
# We used 200000 TRAIN_STEPS
PATH_GIN_FILE = "gs://" + bucket_name + "/replication_package/code_review_model/operative_config.gin"
import gin
with gin.unlock_config():    
    gin.parse_config_file(PATH_GIN_FILE)
    TRAIN_STEPS = 200000
    model.train("pretraining", steps=TRAIN_STEPS)

INFO:root:system_path_file_exists:gs://code_review_automation/replication_package/code_review_model/operative_config.gin
ERROR:root:Path not found: gs://code_review_automation/replication_package/code_review_model/operative_config.gin


INFO:tensorflow:Using config: {'_model_dir': 'gs://code_review_automation/replication_package/model_dumps', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': 5000, '_save_checkpoints_secs': None, '_session_config': graph_options {
  rewrite_options {
    disable_meta_optimizer: true
  }
}
cluster_def {
  job {
    name: "worker"
    tasks {
      key: 0
      value: "10.46.121.234:8470"
    }
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': None, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({'worker': ['10.46.121.234:8470']}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': 'grpc://10.46.121.234:8470', '_evaluation_master': 'grpc:/

  _tokenize, num_parallel_calls=tf.data.experimental.AUTOTUNE)


INFO:tensorflow:num_cores_per_replica: 1
INFO:tensorflow:computation_shape: [1, 1, 1, 1]
INFO:tensorflow:num_replicas: 8
INFO:tensorflow:device_assignment.topology.device_coordinates: [[[0 0 0 0]
  [0 0 0 1]
  [1 0 0 0]
  [1 0 0 1]
  [0 1 0 0]
  [0 1 0 1]
  [1 1 0 0]
  [1 1 0 1]]]
INFO:tensorflow:device_assignment.core_assignment: [[[0 0 0 0]]

 [[0 0 0 1]]

 [[1 0 0 0]]

 [[1 0 0 1]]

 [[0 1 0 0]]

 [[0 1 0 1]]

 [[1 1 0 0]]

 [[1 1 0 1]]]
INFO:tensorflow:auto_logical_to_physical_tpu logical_shape=[8] physical_shape=[2, 2, 2]
INFO:tensorflow:auto_logical_to_physical_tpu logical_to_physical = [(0, 0, 0), (0, 0, 1), (0, 1, 0), (0, 1, 1), (1, 1, 0), (1, 1, 1), (1, 0, 0), (1, 0, 1)]
INFO:tensorflow:SimdMeshImpl init: Shape[batch=8] LayoutRules{('batch', 'batch'), ('experts', 'batch'), ('heads', 'model'), ('d_ff', 'model'), ('ensemble', 'ensemble'), ('vocab', 'model')}
INFO:tensorflow:Device Assignment: <tensorflow.python.tpu.device_assignment.DeviceAssignment object at 0x7f9a171d5a50>
INF