#Pretraining Data Generation Script

This notebook processes tsv data and uploads the processed data to GCS to be used for pretraining Mutformer.

Note: Run multiple copies of this notebook in multiple VMs to generate data in parallel for multiple models

* Note: TO ACCESS ANY BUCKET WITH PERMISSION TO VIEW: go to this address: https://console.cloud.google.com/storage/browser/(BUCKET_NAME)



#Downgrade TensorFlow (most likely requires runtime restart if using Colab runtime)

In [None]:
!pip install tensorflow==1.15

# Configure settings

In [None]:
#@markdown ###General Config
#@markdown Whether or not this script is being run in a GCP runtime (if more memory is required for large databases)
GCP_RUNTIME = False #@param {type:"boolean"}
#@markdown Name of the GCS bucket to use:
BUCKET_NAME = "theodore_jiang" #@param {type:"string"}
BUCKET_PATH = "gs://"+BUCKET_NAME
#@markdown Because of dynamic masking, which requires generating a new dataset for each train epoch, the train, eval, and test set are written into separate folders.
#@markdown * Folder in GCS to store pretraining data in (dynamic masking generation write multiple datasets as subfolders inside of this folder):
DATA_DIR = "pretraining_data_1024_embedded_mutformer" #@param {type:"string"}
#@markdown \
#@markdown 
#@markdown 
#@markdown ###Data Config
#@markdown Maximum number of datasets to keep at a time (make sure this value is the same in the training script):
DATA_COPIES = 20 #@param {type:"integer"}
#@markdown Maximum output data sequence length:
MAX_SEQ_LENGTH =  1024#@param {type:"integer"}
#@markdown For the masked LM task, a certain number of amino acids per sequence are masked. This number will be determined below by either a probability or a fixed number of masks, whichever number is lower.
#@markdown * What probability to use for masking amino acids:
MASKED_LM_PROB = 0.15 #@param
#@markdown * What fixed max number of masked amino acids to use:
MAX_PREDICTIONS = 20 #@param {type:"integer"}

DATA_INFO = {      ##dictionary that will be uploaded alongside each dataset to indicate its parameters
      "sequence_length":MAX_SEQ_LENGTH,
      "max_num_predictions":MAX_PREDICTIONS,
      "max_masked_prob":MASKED_LM_PROB
}
#### Vocabulary for the model (MutFormer uses the vocabulary below) ([PAD]
#### [UNK],[CLS],[SEP], and [MASK] are necessary default tokens; B and J
#### are markers for the beginning and ending of a protein sequence,
#### respectively; the rest are all amino acids possible, ranked 
#### approximately by frequency of occurence in human population)
#### vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
vocab = \
'''[PAD]
[UNK]
[CLS]
[SEP]
[MASK]
L
S
B
J
E
A
P
T
G
V
K
R
D
Q
I
N
F
H
Y
C
M
W'''
with open("vocab.txt", "w") as fo:
  for token in vocab.split("\n"):
    fo.write(token+"\n")

#If using a GCP runtime to generate data (if database is large and more memory is needed), follow these instructions to set it up

###1) Create a VM from the GCP website
###2) Open a command prompt on your computer and perform the following steps"
To ssh into the VM, run:

```
gcloud beta compute ssh --zone <COMPUTE ZONE> <VM NAME> --project <PROJECT NAME> -- -L 8888:localhost:8888
```

Note: Make sure the port above matches the port below (in this case it's 8888)
\
\
In the new command prompt that popped out, either run each of the commands below individually, or copy and paste the one liner below:
```
sudo apt-get update
sudo apt-get -y install python3 python3-pip
sudo apt-get install pkg-config
sudo apt-get install libhdf5-serial-dev
sudo apt-get install libffi6 libffi-dev
sudo -H pip3 install jupyter tensorflow==1.14 google-api-python-client tqdm
sudo -H pip3 install jupyter_http_over_ws
jupyter serverextension enable --py jupyter_http_over_ws
jupyter notebook   --NotebookApp.allow_origin='https://colab.research.google.com'   --port=8888   --NotebookApp.port_retries=0   --no-browser
```
One command:
```
sudo apt-get update ; sudo apt-get -y install python3 python3-pip ; sudo apt-get install pkg-config ; sudo apt-get -y install libhdf5-serial-dev ; sudo apt-get install libffi6 libffi-dev; sudo -H pip3 install jupyter tensorflow==1.14 google-api-python-client tqdm ; sudo -H pip3 install jupyter_http_over_ws ; jupyter serverextension enable --py jupyter_http_over_ws ; jupyter notebook   --NotebookApp.allow_origin='https://colab.research.google.com'   --port=8888   --NotebookApp.port_retries=0   --no-browser
```
###3) In this notebook, click the "connect to local runtime" option under the connect button, and copy and paste the link outputted by command prompt with "locahost: ..."

#Clone the MutFormer repo

In [None]:
if GCP_RUNTIME:
  !sudo apt-get -y install git-all
#@markdown Where to clone the repo into:
REPO_DESTINATION_PATH = "mutformer" #@param {type:"string"}
import os,shutil
if not os.path.exists(REPO_DESTINATION_PATH):
  os.makedirs(REPO_DESTINATION_PATH)
else:
  shutil.rmtree(REPO_DESTINATION_PATH)
  os.makedirs(REPO_DESTINATION_PATH)
cmd = "git clone https://github.com/WGLab/mutformer.git \"" + REPO_DESTINATION_PATH + "\""
!{cmd}

#Imports/Authenticate for GCP

In [None]:
if not GCP_RUNTIME:
  def authenticate_user(): ##authentication function that uses link authentication instead of popup
    if os.path.exists("/content/.config/application_default_credentials.json"): 
      return
    print("Authorize for runtime GCS:")
    !gcloud auth login --no-launch-browser
    print("Authorize for TPU GCS:")
    !gcloud auth application-default login  --no-launch-browser
  authenticate_user()

import sys
import json
import random
import logging
import tensorflow.compat.v1 as tf
import re
import time
import os
import shutil
import random
import importlib

if REPO_DESTINATION_PATH == "mutformer":
  if os.path.exists("mutformer_code"):
    shutil.rmtree("mutformer_code")
  shutil.copytree(REPO_DESTINATION_PATH,"mutformer_code")
  REPO_DESTINATION_PATH = "mutformer_code"
if not os.path.exists("mutformer"):
  shutil.copytree(REPO_DESTINATION_PATH+"/mutformer_model_code","mutformer")
else:
  shutil.rmtree("mutformer")
  shutil.copytree(REPO_DESTINATION_PATH+"/mutformer_model_code","mutformer")
if "mutformer" in sys.path:
  sys.path.remove("mutformer")
sys.path.append("mutformer")

from mutformer import tokenization

##reload modules so that you don't need to restart the runtime to reload modules in case that's needed
modules2reload = [tokenization]
for module in modules2reload:
    importlib.reload(module)

#Specify Input Data location/Mount Drive if needed

In [None]:
if not GCP_RUNTIME:
  from google.colab import drive
#@markdown Input finetuning data folder: data will be read from here to be processed and uploaded to GCS (can be a drive path or a GCS path; must be a GCS path if using GCP_RUNTIME):
INPUT_DATA_FOLDER = "gs://theodore_jiang/gcs_pretraining_data" #@param {type: "string"}
if "/content/drive" in INPUT_DATA_FOLDER:
  !fusermount -u /content/drive
  drive.flush_and_unmount()
  drive.mount('/content/drive', force_remount=True)
  DRIVE_PATH = "/content/drive/My Drive"

#Data Generation

As part of the data generation process, this script:
1. Create shards for eval, test, and train sets: in order to conserve memory during data generation, data is split into multiple shards to be processed and uploaded.
2. Generates eval, test sets:
  * Eval and test sets do not need to be dynamically masked, so they are generated once into seperate folders inside GCS 
3. Generates train set repeatedly for dynamic masking:
  * The script generates DATA_COPIES datasets using the same input train data, but with each epoch containing data with different amino acid masking/alterations, which prevents overfitting during pretraining

## Data preparation (make shards)

In [None]:
#@markdown Whether or not to store shards into GCS (useful for large databases)
GCS_shards = False #@param {type:"boolean"}
#@markdown If using a data from google drive, size of a single chunk/shard of data (in terms of lines/datatpoints)
chunk_size_gd = 256000 #@param {type:"number"}
#@markdown If using a data from GCS, size of a single chunk/shard of data (in terms of bytes)(the code segment will take care of abnormal line cutoffs)
chunk_size_gcs =  500e6 #@param {type:"number"}
chunk_size_gcs = int(chunk_size_gcs)

def make_shards(dataset):    
  print("Generating shards for "+dataset+":\n")
  if os .path.exists("./shards_tmp_"+dataset): ##data will be written as shards to prevent one single files from getting too large
    shutil.rmtree("./shards_tmp_"+dataset)
  os.makedirs("./shards_tmp_"+dataset)

  if os .path.exists("./shards_"+dataset): 
    shutil.rmtree("./shards_"+dataset)
  os.makedirs("./shards_"+dataset)
  
  if GCS_shards:
    print("removing existing data if it exists...")
    cmd = "gsutil -m rm -r "+INPUT_DATA_FOLDER+"/shards_"+dataset+""
    !{cmd}
  start = 0
  end = chunk_size_gcs
  previous_truncated = ""
  i=0
  while True:
    print("Processing shard "+str(i))
    ##download the selected portion of the input file
    if "gs://" in INPUT_DATA_FOLDER:
      cmd = "gsutil cat -r "+str(start)+"-"+str(end)+" "+INPUT_DATA_FOLDER+"/"+dataset+".txt"+" | gsutil -q cp - ./shards_tmp_"+dataset+"/shard_tmp"
      !{cmd}
    else:
      cmd = "cat -r "+str(start)+"-"+str(end)+" "+INPUT_DATA_FOLDER+"/"+dataset+".txt"+" | gsutil -q cp - ./shards_tmp_"+dataset+"/shard_tmp"
      !{cmd}
    ##get the line count
    cmd = "wc -l <./shards_tmp_"+dataset+"/shard_tmp"
    line_count = !{cmd}
    line_count = int(line_count[0])
    
    ##get the actual byte count
    cmd = "wc -c <./shards_tmp_"+dataset+"/shard_tmp"
    byte_count = !{cmd}
    byte_count = int(byte_count[0])
    if line_count == 0:
      print("(Ignore the previous ServiceException.) finished after processing "+str(i+1)+" shards... appending the last truncated line to the end and continuing\n\n")
      i-=1
      break
    print("processing",line_count,"lines...")
    ##get the last few lines of the downloaded file
    cmd = "dd  if=./shards_tmp_"+dataset+"/shard_tmp ibs=1 skip="+str(byte_count-MAX_SEQ_LENGTH*2)+" count="+str(MAX_SEQ_LENGTH*2)+" status=none > previous_tcd.txt"
    !{cmd}
    ##truncate off the last line
    cmd = "sed -ni \'"+str(1)+","+str(line_count)+"p;"+str(line_count)+"q\' ./shards_tmp_"+dataset+"/shard_tmp"
    !{cmd}

    ##add the previously truncated line to the front of the file
    cmd = "sed -i \'1s/^/"+previous_truncated+" /\' ./shards_tmp_"+dataset+"/shard_tmp >garbage.txt"
    !{cmd}
    ##get the last line, which just got truncated, but will be added to the front of the next shard
    previous_truncated = open("previous_tcd.txt").read().split("\n")[-1]
    ##copy data to GCS
    
    if GCS_shards:
      print("Uploading to GCS...\n")
      cmd = "gsutil -q cp ./shards_tmp_"+dataset+"/shard_tmp "+INPUT_DATA_FOLDER+"/shards_"+dataset+"/shard_"+str(i)
      !{cmd}
    else:
      cmd = "cp ./shards_tmp_"+dataset+"/shard_tmp ./shards_"+dataset+"/shard_"+str(i)
      !{cmd}

    start+=chunk_size_gcs
    end+=chunk_size_gcs
    i+=1
  ##appending the last truncated line to the end of the last file

  if GCS_shards:
    cmd = "gsutil -q cp "+INPUT_DATA_FOLDER+"/shards_"+dataset+"/shard_"+str(i)+" ./shards_tmp_"+dataset+"/shard_tmp"
    !{cmd}
    with open("./shards_tmp_"+dataset+"/shard_tmp","a") as writer:
      writer.write(previous_truncated)
    cmd = "gsutil cp ./shards_tmp_"+dataset+"/shard_tmp "+INPUT_DATA_FOLDER+"/shards_"+dataset+"/shard_"+str(i)
    !{cmd}
  else:
    cmd = "cp ./shards_"+dataset+"/shard_"+str(i)+" ./shards_tmp_"+dataset+"/shard_tmp"
    !{cmd}
    with open("./shards_tmp_"+dataset+"/shard_tmp","a") as writer:
      writer.write(previous_truncated)
    cmd = "cp ./shards_tmp_"+dataset+"/shard_tmp ./shards_"+dataset+"/shard_"+str(i)
    !{cmd}
  if GCS_shards:
    data_dir = INPUT_DATA_FOLDER+"/shards_"+dataset
  else:
    data_dir = "./shards_"+dataset
  return data_dir
input_train_dir = make_shards("train")
input_eval_dir = make_shards("eval")
input_test_dir = make_shards("test")

##Define Data Generation Op

In [None]:
def generate_data(input_dir,dir):
  seed = random.randrange(sys.maxsize)
  input_files = ",".join([input_dir+"/"+file for file in tf.io.gfile.listdir(input_dir)])
  out_files = ",".join([dir+"/"+file+".tfrecord" for file in tf.io.gfile.listdir(input_dir)])
  print("input_files:",input_files,"output_files:",out_files)

  XARGS_CMD = (f"python3 mutformer/create_pretraining_data.py "
              f"--input_file={input_files} "
              f"--output_file={out_files} "
              f"--vocab_file=vocab.txt "
              f"--do_lower_case=False "
              f"--max_predictions_per_seq={MAX_PREDICTIONS} "
              f"--max_seq_length={MAX_SEQ_LENGTH} "
              f"--masked_lm_prob={MASKED_LM_PROB} "
              f"--random_seed={seed} "
              f"--dupe_factor=1")
  
  if os.path.exists(dir):
    shutil.rmtree(dir)
  os.makedirs(dir)
  
  !$XARGS_CMD

##Eval and Test Data Generation

In [None]:
EVAL_DIR = f"{DATA_DIR}/eval"
TESTING_DIR = f"{DATA_DIR}/test"

##Eval set
if os.path.exists(EVAL_DIR):
  shutil.rmtree(EVAL_DIR)
os.makedirs(EVAL_DIR)
generate_data(input_eval_dir,EVAL_DIR)
with open(EVAL_DIR+"/info.json","w+") as out: ##writes out the dictionary containing
      json.dump(DATA_INFO,out,indent=2)                   ##the dataset's parameters

##Test set
if os.path.exists(TESTING_DIR):
  shutil.rmtree(TESTING_DIR)
os.makedirs(TESTING_DIR)
generate_data(input_test_dir,TESTING_DIR)
with open(TESTING_DIR+"/info.json","w+") as out: 
      json.dump(DATA_INFO,out,indent=2)                  

cmd="gsutil -m cp -r "+DATA_DIR +" gs://"+BUCKET_NAME
!{cmd}

##Repeated Parallel Training Data Generation (for dynamic masking)

In [None]:
#@markdown When finished generating all DATA_COPIES datasets, how long to wait before checking again if the data has been used (to minimize interaction with GCS, should be around the same time it takes for the mode train script to train 1 model):
CHECK_DATA_EVERY_N_SECS = 1200 #@param {type:"integer"}

TRAIN_DIR = f"{DATA_DIR}/train"

while True:
  try:
    available_indexes = tf.io.gfile.listdir(BUCKET_PATH+"/"+TRAIN_DIR) ##get all currently 
  except Exception:                                                    ##generated datasets
    available_indexes = []

  print("Already generated datasets:",available_indexes)
   
  for i in range(0,DATA_COPIES):                                     
    if os.path.exists(TRAIN_DIR):
      shutil.rmtree(TRAIN_DIR)
    
    os.makedirs(TRAIN_DIR)
    available_indexes = [''.join([i for i in available_index if i.isdigit()]) for available_index in available_indexes]
    if str(i) not in available_indexes:
      print("Processing data for dataset number:",i)
      out_dir = TRAIN_DIR+"/"+str(i)
      print("Writing into dir:",out_dir)
      if not os.path.exists(out_dir):
        os.makedirs(out_dir)
      generate_data(input_train_dir,out_dir)
      print("\nUpdating and uploading data info json...\n")
      
      with open(out_dir+"/info.json","w+") as out: ##writes out a dictionary containing
        json.dump(DATA_INFO,out,indent=2)                   ##the dataset's parameters
      print("Data info json uploaded successfully")
      cmd="gsutil -m cp -r "+out_dir+" "+BUCKET_PATH+"/"+out_dir
      !{cmd}
      available_indexes = tf.io.gfile.listdir(BUCKET_PATH+"/"+TRAIN_DIR) ##we only have to refresh 
                                                                        ##available indexes after 
                                                                        ##each data generation 
    time.sleep(1) ##1 second sleep to prevent abnormal cloud interactions due to timing
  time.sleep(CHECK_DATA_EVERY_N_SECS)