Install all the necessary packages. Make sure to use the Databricks Machine Learning Runtime. Please note that for efficient int 8 training we are using bitandbytes which requires 8-bit tensor core-supported hardware, which are Turing and Ampere GPUs (RTX 20s, RTX 30s, A40-A100, T4+). Choose the appropriate GPU instances to run this. We are installing PEFT for effificient training with LoRa from the source and specific versions of the libraries that will work well together.

In [0]:
# install Hugging Face Libraries
%pip install git+https://github.com/huggingface/peft.git
%pip install "transformers==4.27.2" "datasets==2.9.0" "accelerate==0.17.1" "evaluate==0.4.0" "bitsandbytes==0.37.1" --quiet

In [0]:
from datasets import concatenate_datasets
import numpy as np
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, default_data_collator, get_linear_schedule_with_warmup
from transformers import DataCollatorForSeq2Seq
import torch

Write a basic configuration for accelerate and set the mixed precision mode. Accelerate allows for efficient distributed training without any major changed to the training loop. This speeds up training quite a bit.

In [0]:
%sh
python -c "from accelerate.utils import write_basic_config; write_basic_config(mixed_precision='bf16')"

The configuration looks like this. Please note the 	- num_processes parameter. This gives you the number of GPUs you have available to train on.

In [0]:
%sh
accelerate env

The dataset we will be using for finetuning looks like this

In [0]:
%sql
SELECT * FROM
hive_metastore.default.hf_sql_dataset

Read it into a dataframe and shuffle it

In [0]:
seqtext = spark.sql("SELECT * FROM hive_metastore.default.hf_sql_dataset")
seqtext_pd = seqtext.toPandas().sample(frac =1, random_state=42)

In [0]:
seqtext_pd.shape

Define a preprocessing function. Notice how we have nestled the sequence to be translated to SQL between "Translate English to SQL: " and ". </s>". Empirically, this seems to have given the best finetuning results as FLAN-T5 was trained with such a string formatting for translating tasks according to the original publication

In [0]:
 def preprocess_function(sample,padding="max_length"):
  # add prefix to the input for t5
  inputs = ["Translate English to SQL: " + item + ". </s>" for item in sample["question_formatted"]]

  # tokenize inputs
  model_inputs = tokenizer(inputs, max_length=max_source_length, padding=padding, truncation=True)

  # Tokenize targets with the `text_target` keyword argument
  labels = tokenizer(text_target=sample["query"], max_length=max_target_length, padding=padding, truncation=True)

  # If we are padding here, replace all tokenizer.pad_token_id in the labels by -100 when we want to ignore
  # padding in the loss.
  if padding == "max_length":
      labels["input_ids"] = [
          [(l if l != tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"]
     ]

  model_inputs["labels"] = labels["input_ids"]
  return model_inputs



Depending on the resources you have available, choose the model size. The trained model in the Hub we have available is fine-tuned FLAN-T5-XL. But for the sake of this example we go with flan-t5-base. Download the tokenizer and model. Cache the model in dbfs so that there are no issues when the model is fetched from the local environment during the training loop encapsulated.

In [0]:
#Safe option for initially trying out. But with smaller model size (i.e. parameters), the performance will suffer for this example
model_id = "google/flan-t5-base"
#Uncomment if you have resources
#model_id = "google/flan-t5-xl"  
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)

In [0]:
cache =  "/dbfs/FileStore/shared_uploads/avinash.sooriyarachchi@databricks.com/text_to_sql/flanbase_accelerate/cache"


In [0]:
model = AutoModelForSeq2SeqLM.from_pretrained(model_id,  torch_dtype=torch.bfloat16)

In [0]:
model.save_pretrained(cache, from_pt=True)

In [0]:
#Just testing if the model was cached properly
#model = AutoModelForSeq2SeqLM.from_pretrained(cache,  torch_dtype=torch.bfloat16)

The maximum total input sequence length after tokenization. Sequences longer than this will be truncated, sequences shorter will be padded.
Do the same for the target strings

In [0]:
from datasets import load_dataset , Dataset, concatenate_datasets 

dataset = Dataset.from_pandas(seqtext_pd).train_test_split(test_size=0.02)
dataset["train"] = dataset["train"].remove_columns(["__index_level_0__"])
dataset["test"] = dataset["test"].remove_columns(["__index_level_0__"])  
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
tokenized_inputs = concatenate_datasets([dataset["train"], dataset["test"]]).map(lambda x: tokenizer(x["question_formatted"], truncation=True), batched=True, remove_columns=["question_formatted", "query"])
input_lenghts = [len(x) for x in tokenized_inputs["input_ids"]]
# take 99th percentile of max length for better utilization
max_source_length = int(np.percentile(input_lenghts, 99))
#print(f"Max source length: {max_source_length}")

# The maximum total sequence length for target text after tokenization.
# Sequences longer than this will be truncated, sequences shorter will be padded."
tokenized_targets = concatenate_datasets([dataset["train"], dataset["test"]]).map(lambda x: tokenizer(x["query"], truncation=True), batched=True, remove_columns=["question_formatted", "query"])
target_lenghts = [len(x) for x in tokenized_targets["input_ids"]]
# take 99th percentile of max length for better utilization
max_target_length = int(np.percentile(target_lenghts, 99))

In [0]:
max_source_length, max_target_length

In [0]:
def training_function():#model):
  import evaluate
  import os
  import torch
  from accelerate import Accelerator
  import numpy as np
  from torch.utils.data import DataLoader
  from tqdm import tqdm

  from peft import LoraConfig, TaskType, get_peft_model, get_peft_config, get_peft_model_state_dict, prepare_model_for_int8_training 
  from peft.utils.other import fsdp_auto_wrap_policy
  import sentencepiece
  from transformers import DataCollatorForSeq2Seq
  accelerator = Accelerator()


  #print(f"Max target length: {max_target_length}")

  processed_datasets = dataset.map(
            preprocess_function,
            batched=True,
            num_proc=1,
            remove_columns=["question_formatted", "query"],
            load_from_cache_file=False,
            desc="Running tokenizer on dataset",
        )

  train_dataset = processed_datasets["train"]
  eval_dataset = processed_datasets["test"]

  # To have only one message (and not 8) per logs of Transformers or Datasets, we set the logging verbosity
  # to INFO for the main process only.
  lora_config = LoraConfig(
            r=16,
            lora_alpha=32,
            target_modules=["q", "v"],
            lora_dropout=0.05,
            bias="none",
            task_type=TaskType.SEQ_2_SEQ_LM
            )
  model_id = "google/flan-t5-base"
  #model = AutoModelForSeq2SeqLM.from_pretrained(model_id,  torch_dtype=torch.bfloat16)
  cache =  '<cache-path>'
  model = AutoModelForSeq2SeqLM.from_pretrained(cache,  torch_dtype=torch.bfloat16)
  model = prepare_model_for_int8_training(model)
  model = get_peft_model(model, lora_config)

  # we want to ignore tokenizer pad token in the loss
  label_pad_token_id = -100
  # Data collator
  data_collator = DataCollatorForSeq2Seq(
      tokenizer,
      model=model,
      label_pad_token_id=label_pad_token_id,
      pad_to_multiple_of=8
  )

  batch_size = 8
  train_dataloader = DataLoader(train_dataset, shuffle=True, collate_fn=data_collator, batch_size=batch_size, pin_memory=True)

  eval_dataloader = DataLoader(eval_dataset, collate_fn=data_collator, batch_size=batch_size, pin_memory=True)
    

  #A lower training rate to begin with is generally better
  lr = 1e-4
  #2-5 Epochs may be sufficient initially
  num_epochs = 4
  

  optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
  lr_scheduler = get_linear_schedule_with_warmup(
      optimizer=optimizer,
      num_warmup_steps=0,
      num_training_steps=(len(train_dataloader) * num_epochs),
  )

  
  model, train_dataloader, eval_dataloader, optimizer, lr_scheduler = accelerator.prepare(
      model, train_dataloader, eval_dataloader, optimizer, lr_scheduler
  )
  accelerator.print(model)
  # Instantiate a progress bar to keep track of training. Note that we only enable it on the main
  progress_bar = tqdm(range(num_epochs * len(train_dataloader)), disable=not accelerator.is_main_process)
  # Now we train the model
  for epoch in range(num_epochs):
      model.train()
      total_loss = 0
      for step, batch in enumerate(tqdm(train_dataloader)):
          outputs = model(**batch)
          loss = outputs.loss
          total_loss += loss.detach().float()
          loss.backward()
          optimizer.step()
          lr_scheduler.step()
          optimizer.zero_grad()

      model.eval()
      eval_loss = 0
      #eval_preds = []
      #eval_refs = []
      for step, batch in enumerate(tqdm(eval_dataloader)):
          with torch.no_grad():
              outputs = model(**batch)
          loss = outputs.loss
          eval_loss += loss.detach().float()
          #preds = accelerator.gather_for_metrics(torch.argmax(outputs.logits, -1)).detach().cpu().numpy()
          #eval_preds.extend(tokenizer.batch_decode(preds, skip_special_tokens=True))
          #refs = accelerator.gather_for_metrics(batch["labels"].detach().cpu().numpy())
          #items=[[item] for item in tokenizer.batch_decode(refs, skip_special_tokens=True)]
          #eval_refs.extend(items)
          
      eval_epoch_loss = eval_loss / len(eval_dataloader)
      eval_ppl = torch.exp(eval_epoch_loss)
      train_epoch_loss = total_loss / len(train_dataloader)
      train_ppl = torch.exp(train_epoch_loss)
      accelerator.print(f"{epoch=}: {train_ppl=} {train_epoch_loss=} {eval_ppl=} {eval_epoch_loss=}")
      accelerator.wait_for_everyone()
      #Save the model for persistent storage such that it can be retrieved later, tested and deployed
      base_dir = '<base-dir>'
      model_save = '<model-name>'
      save_path = base_dir+model_save
      model.module.save_pretrained(save_path, state_dict=accelerator.get_state_dict(model))
      accelerator.wait_for_everyone()


In [0]:
from accelerate import notebook_launcher

notebook_launcher(training_function,num_processes=1)

### Inference testing

Redefining things here if there's a need to try out inference separately

In [0]:
base_dir = '<base-dir>'
model_save = '<model-name>'
peft_model_id = base_dir+model_save
config = PeftConfig.from_pretrained(peft_model_id)
model = AutoModelForSeq2SeqLM.from_pretrained(cache)
model = PeftModel.from_pretrained(model, peft_model_id)

In [0]:
input_text = "Translate English to SQL: How many people living in the Manhattan make more than 100000 a year?, schema: wages (ID INTEGER, city STRING, state STRING, salary INTEGER)"
inputs = tokenizer(input_text, return_tensors="pt")
outputs = model.generate(input_ids=inputs["input_ids"], max_new_tokens=100)
print( tokenizer.batch_decode(outputs.detach().cpu().numpy(), skip_special_tokens=True)[0])

### Log with MLFlow, wrap in a custom python function, with an inference example and deploy with Databricks model serving

Create an inference example

In [0]:
payload_pd = pd.DataFrame([[input_text]],columns=['text'])
input_example = payload_pd

It's interesting to note that during the finetuning process, we are only finetuning a few parameters and saving a LoRa adapter, that could be affixed to the layers of the base model and used for inference/ deployed. This adapter has to be wrapped into the pyfunc alongside the pretrained model

In [0]:
tokenizer_path = '/tmp/tokenizer/'
pretrained_model_path = cache
lora_adatper_path = base_dir+model_save
tokenizer.save_pretrained(tokenizer_path)

In [0]:
import mlflow.pyfunc

class TextToSQL(mlflow.pyfunc.PythonModel):
  def load_context(self, context):
    from peft import PeftModel, PeftConfig
    from peft import get_peft_config, get_peft_model, get_peft_model_state_dict, LoraConfig, TaskType
    from peft import LoraConfig, get_peft_model, prepare_model_for_int8_training, TaskType
    from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
    self.config = PeftConfig.from_pretrained(context.artifacts["lora_adatper_path"])
    self.tokenizer = AutoTokenizer.from_pretrained(context.artifacts['tokenizer_path'])
    self.model_pretrained = AutoModelForSeq2SeqLM.from_pretrained(context.artifacts["pretrained_model_path"])
    self.model = PeftModel.from_pretrained(self.model_pretrained, context.artifacts["lora_adatper_path"])

  def predict(self, context, model_input ):
    import json
    question = model_input.iloc[:,0].to_list()[0] # get the first column

    inputs = self.tokenizer(question, return_tensors="pt")


    outputs = self.model.generate(input_ids=inputs["input_ids"], max_new_tokens=300)
    generated_sql = self.tokenizer.batch_decode(outputs.detach().cpu().numpy(), skip_special_tokens=True)[0] 

    result = {'code': generated_sql}
    return json.dumps(result)

In [0]:
from sys import version_info
 
PYTHON_VERSION = "{major}.{minor}.{micro}".format(major=version_info.major,
                                                  minor=version_info.minor,
                                                  micro=version_info.micro)

In [0]:
import cloudpickle
conda_env = {
    'channels': ['defaults'],
    'dependencies': [
      'python={}'.format(PYTHON_VERSION),
      'pip',
      {
        'pip': [
          'mlflow',
          'transformers==4.27.2',
          "datasets==2.9.0",
          "accelerate==0.17.1",
          "evaluate==0.4.0",
          "bitsandbytes==0.37.1",
          "peft",
          'pandas',
          "sentencepiece",
          'cloudpickle=={}'.format(cloudpickle.__version__),
          'torch'],
      },
    ],
    'name': 'code_env'
}

mlflow_pyfunc_model_path = "flant5base_text_to_sql"

In [0]:
mlflow.pyfunc.log_model(artifact_path=mlflow_pyfunc_model_path, python_model=TextToSQL(),artifacts=artifacts, conda_env=conda_env, input_example = input_example)

Once the model is logged, it can be registered to the mlflow model registry.Versioned, tested,transitioned to the deployment stage and deployed following the instructions given here: https://docs.databricks.com/machine-learning/model-serving/create-manage-serving-endpoints.html