### Multi-node distributed inference of *Hugging Face* models with *PyTorch* and *Pandas UDF*, optimized with PyTorch's *BetterTransformer*

##### Here we perform model inference using the NLP model from [*Hugging Face*](https://huggingface.co/), after we fine tune it. We use [*Pandas user-defined functions*](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html#pandas-udfs-a-k-a-vectorized-udfs) on *Spark* to implement a distributed, parallel approach for model inference.

##### We optimize the model inference computation by using PyTorch’s [*BetterTransformer*]( https://pytorch.org/blog/a-better-transformer-for-fast-transformer-encoder-inference), speeding up some computations in the *Transformer* layers of the model.

##### Import the necessary packages.

In [1]:
import os
import time
import glob
from datetime import datetime

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType, IntegerType, ArrayType, StructType, StructField
from pyspark.sql.functions import mean

import torch
from torch.utils.data import Dataset, DataLoader

from transformers import AutoConfig, AutoTokenizer, AutoModelForSequenceClassification

from optimum.bettertransformer import BetterTransformer

from notebookutils import mssparkutils

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 5, Finished, Available)

2023-06-09 02:10:37.264800: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-06-09 02:10:37.847088: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


##### Read the prepared testing data from the *Lakehouse* into a *Spark DataFrame*. We force a data repartition to illustrate the parallel processing with *Pandas UDF* later, given that the data is relatively small.

In [2]:
# prepared_data_dir_spark = 'Files/prepared_data' # we could use this relative path for Spark access, if the Lakehouse was set as default for the Notebook
prepared_data_dir_spark = 'abfss://<YOUR FABRIC WORKSPACE NAME>@msit-onelake.dfs.fabric.microsoft.com/<YOUR FABRIC LAKEHOUSE NAME>.Lakehouse/Files/prepared_data'
num_partitions = 8

sdf_test = spark.read.parquet(os.path.join(prepared_data_dir_spark, 'tokens_test_data')).repartition(num_partitions)
display(sdf_test.limit(10))

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, f470ece5-07af-42a7-b30e-cdf3fdb7111e)

##### We create a custom PyTorch [*Dataset*](https://pytorch.org/tutorials/beginner/data_loading_tutorial.html) class representing our prepared test data. This *Dataset* will later be used for creating a PyTorch *DataLoader*, which provides batches of data for our model inference computation.

In [3]:
class TestDataset(Dataset):  
    def __init__(self, input_ids, attention_masks):  
        self.input_ids = input_ids  
        self.attention_masks = attention_masks  
  
    def __len__(self):  
        return len(self.input_ids)  
  
    def __getitem__(self, index):   
        input_id = torch.tensor(self.input_ids.iloc[index], dtype=torch.long)  
        attention_mask = torch.tensor(self.attention_masks.iloc[index], dtype=torch.long)  
  
        return {
            'input_ids': input_id,  
            'attention_mask': attention_mask  
        }

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 7, Finished, Available)

##### Here we mount a *Lakehouse* for local access through Python. This wouldn’t be necessary if the *Lakehouse* was set as default for this Notebook, as in this case one could access the *Lakehouse* using the *File API path*.

In [4]:
mssparkutils.fs.mount( 
 'abfss://<YOUR FABRIC WORKSPACE NAME>@msit-onelake.dfs.fabric.microsoft.com/<YOUR FABRIC LAKEHOUSE NAME>.Lakehouse/Files/', 
 '/lakehouse'
)

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 8, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7be1b448-b2e5-48b7-a312-7bbed6fe6db9)

True

##### Here we instantiate a pre-trained *RoBERTa* model and load it with the model weights saved in the *Lakehouse* when fine-tuning it.

In [5]:
hidden_dropout_prob = 0.
attention_probs_dropout_prob = 0.
num_labels = 2
model_type = 'roberta-base'
# model_folder = '/lakehouse/default/Files/model_outputs_fsdp' # we could use this default mounted path, if the Lakehouse was set as default for the Notebook
model_folder = os.path.join(mssparkutils.fs.getMountPath('/lakehouse'), 'model_outputs_fsdp')
batch_size = 32

config = AutoConfig.from_pretrained(model_type, hidden_dropout_prob=hidden_dropout_prob,
                                    attention_probs_dropout_prob=attention_probs_dropout_prob, num_labels=num_labels)
model = AutoModelForSequenceClassification.from_pretrained(model_type, config=config)

model_path = glob.glob(model_folder + '/roberta-base-finetuned_*')[0]
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 9, Finished, Available)

Downloading (…)lve/main/config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/501M [00:00<?, ?B/s]

Some weights of the model checkpoint at roberta-base were not used when initializing RobertaForSequenceClassification: ['lm_head.layer_norm.bias', 'roberta.pooler.dense.bias', 'lm_head.dense.weight', 'roberta.pooler.dense.weight', 'lm_head.layer_norm.weight', 'lm_head.decoder.weight', 'lm_head.dense.bias', 'lm_head.bias']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.

<All keys matched successfully>

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 19, Finished, Available)

##### We show the model architecture here to later compare it with the optimized model after wrapping it with *BetterTransformer*.

In [6]:
model

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 10, Finished, Available)

RobertaForSequenceClassification(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
             

##### To make use of *BetterTransformer* we only need a single line of code, calling its *transform* method and passing in the original instantiated model.

In [7]:
model_optim = BetterTransformer.transform(model)

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 11, Finished, Available)

##### We can see now that all encoder layers from the original model were replaced with the *BertLayerBetterTransformer* class, which implements the model inference optimizations.

In [8]:
model_optim

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 12, Finished, Available)

RobertaForSequenceClassification(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayerBetterTransformer()
      )
    )
  )
  (classifier): RobertaClassificationHead(
    (dense): Linear(in_features=768, out_features=768, bias=True)
    (dropout): Dropout(p=0.0, inplace=False)
    (out_proj): Linear(in_features=768, out_features=2, bias=True)
  )
)

##### We then define the *Pandas UDF* function that performs model inference.

##### This function takes as input two columns with the information needed for model scoring, *input_ids* and *attention_mask*, as *Pandas Series* objects. These are used to instantiate our custom *Dataset* and then create the corresponding *DataLoader* for providing batches of data.

##### Then the predicted labels and corresponding probability scores are returned as a *Pandas DataFrame* object.

##### Notice that we use *torch.set_num_threads(1)* to make sure we avoid potential thread concurrency and potential crashes of job tasks. This is because the test dataset is only about 1,000 rows, which is not big enough for the Microsoft Fabric infrastructure to distribute the data processing to more than one Spark executor. We forced the data repartition into 8 partitions and the underlying executor has 8 CPU vCores.

In [9]:
schema = StructType([
  StructField('predicted_label', IntegerType()),
  StructField('score', DoubleType()),
  StructField('exec_time', DoubleType())
])

@pandas_udf(schema)
def predict(input_ids: pd.Series, attention_mask:pd.Series) -> pd.DataFrame:
    test_dataset = TestDataset(input_ids, attention_mask)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    predicted_labels = []
    scores = []
    torch.set_num_threads(1)
    start = datetime.now()
    for batch in test_dataloader:
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        outputs = model_optim(input_ids=input_ids, attention_mask=attention_mask)
        score = torch.softmax(outputs.logits, dim=1).tolist()
        predicted_labels += list(map(lambda x: 0 if x[1] < 0.5 else 1, score))
        scores += [s[1] for s in score]
    total_sec = (datetime.now() - start).total_seconds()
    return pd.DataFrame({'predicted_label': predicted_labels, 'score':scores, 'exec_time': total_sec})

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 13, Finished, Available)

##### The *Pandas UDF* function is then applied to the *Spark DataFrame* representing the prepared test data.

##### We cache the *Spark DataFrame* after defining the predictions computation to avoid unnecessary re-computations later. 

In [10]:
sdf_test = sdf_test.select('label', predict('input_ids', 'attention_mask').alias('predictions'))
sdf_test.cache().count()

display(sdf_test.limit(10))

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, 746766b7-bd77-4a12-a22f-ef94de5e6d72)

In [11]:
sdf_exec_time = sdf_test.select('predictions').withColumns({'exec_time': sdf_test['predictions'].getItem('exec_time')}).drop('predictions')
mean_exec_time = sdf_exec_time.agg(mean('exec_time')).collect()

print('Average execution time: %d seconds' % int(mean_exec_time[0].asDict()['avg(exec_time)']))

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 15, Finished, Available)

Average execution time: 9 seconds


##### We split the results into separate columns for the predicted label and corresponding probability score in the *Spark DataFrame* object.

In [12]:
sdf_predicted_test = sdf_test.withColumns({'predicted_label': sdf_test['predictions'].getItem('predicted_label'),
                                           'score': sdf_test['predictions'].getItem('score')}).drop('predictions')

display(sdf_predicted_test.limit(10))

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9626e3cf-1595-43eb-a438-99b6255e5742)

##### We then write the test dataset with predictions as *parquet* files in the *Lakehouse*.

In [13]:
predicted_data_dir_spark = 'abfss://<YOUR FABRIC WORKSPACE NAME>@msit-onelake.dfs.fabric.microsoft.com/<YOUR FABRIC LAKEHOUSE NAME>.Lakehouse/Files/predicted_data'

sdf_predicted_test.write.parquet(os.path.join(predicted_data_dir_spark, 'predicted_test_data_optim'), mode='overwrite')

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 17, Finished, Available)

##### Unmount the *Lakehouse* local path.

In [14]:
mssparkutils.fs.unmount('/lakehouse')

StatementMeta(, ee86a287-ab22-4c69-969f-6e0a848bf0ef, 18, Finished, Available)

True