### Multi-node distributed inference of *Hugging Face* models with *Transformers Pipeline* and *Pandas UDF*

##### Here we perform model inference using the NLP model from [*Hugging Face*](https://huggingface.co/), after we fine tune it. We use [*Transformers Pipeline*](https://huggingface.co/docs/transformers/main_classes/pipelines) to perform the inference task and [*Pandas user-defined functions*](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/udf-python-pandas) on *Spark* to provide the distributed infrastructure to parallelize the process.

Import the necessary packages.

In [0]:
import os
import time

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import FloatType, IntegerType, StructType, StructField

import torch
from torch.utils.data import DataLoader, SequentialSampler

from transformers import pipeline
from transformers import AutoConfig, AutoTokenizer, AutoModelForSequenceClassification



Define the number of workers in the cluster and the folder in the Databricks File System to load the prepared data from.

In [0]:
num_workers = 8

test_data_folder = '/test_data'

Read the testing data into a *Spark DataFrame*. We repartition the data by the number of workers in the cluster. We do this to make sure all workers in the cluster are used to process the data and to illustrate the parallel processing with *Pandas UDF* below, given that the data is relatively small.

In [0]:
sdf_test = spark.read.parquet(os.path.join(test_data_folder, 'test_data')).repartition(num_workers)
display(sdf_test.select(sdf_test.columns).limit(10))

text,label,id
"a charming , banter-filled comedy . . . one of those airy cinematic bon bons whose aims -- and by extension , accomplishments -- seem deceptively slight on the surface .",1,60129542207
the attempt to build up a pressure cooker of horrified awe emerges from the simple fact that the movie has virtually nothing to show .,0,60129542175
marinated in clichés and mawkish dialogue .,0,60129542187
a dark comedy that goes for sick and demented humor simply to do so . the movie is without intent .,0,60129542230
the world needs more filmmakers with passionate enthusiasms like martin scorsese . but it doesn't need gangs of new york .,0,60129542260
"as his circle of friends keeps getting smaller one of the characters in long time dead says 'i'm telling you , this is f * * * ed' . maybe he was reading the minds of the audience .",0,60129542214
scherfig's light-hearted profile of emotional desperation is achingly honest and delightfully cheeky .,1,60129542263
the plot is romantic comedy boilerplate from start to finish .,0,60129542188
humor in i spy is so anemic .,0,60129542206
"the increasingly diverse french director has created a film that one can honestly describe as looking , sounding and simply feeling like no other film in recent history .",1,60129542242


Here we instantiate the pre-trained model from Hugging Face, but load it with the fine-tuned model weights.

In [0]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model_type = 'microsoft/deberta-v3-base'
max_length = 128
hidden_dropout_prob = 0.
attention_probs_dropout_prob = 0.
num_labels = 2

model_folder = '/dbfs/model_outputs'
tokenizer = AutoTokenizer.from_pretrained(model_type)
config = AutoConfig.from_pretrained(model_type, hidden_dropout_prob=hidden_dropout_prob, attention_probs_dropout_prob=attention_probs_dropout_prob, num_labels=num_labels)
model = AutoModelForSequenceClassification.from_pretrained(model_folder, config=config).to(device)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


We then create a [*TextClassificationPipeline*](https://huggingface.co/docs/transformers/v4.21.3/en/main_classes/pipelines#transformers.TextClassificationPipeline) object, which allows us to make model predictions directly from the input text. In this case, the *tokenization* is performed on-the-fly.

In [0]:
pipe = pipeline('text-classification', tokenizer=tokenizer, model=model, device=0)

Here is where we define the function that performs model inference over the testing data, using the *pipeline* object defined above.

The model inference is performed in parallel, over the Spark DataFrame, using Spark's [*Pandas UDF*](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/udf-python-pandas) functionality.

In [0]:
schema = StructType([
  StructField('label', IntegerType()),
  StructField('score', FloatType())
])

@pandas_udf(schema)
def predict(text: pd.Series) -> pd.DataFrame:
    pred = pipe(text.to_list())
    res = pd.DataFrame(pred)
    res['label'][res['label'] == 'LABEL_1'] = 1
    res['label'][res['label'] == 'LABEL_0'] = 0
    res['label'] = res['label'].astype(int)
    return res

The model inference function defined above is then executed through the *select* function from the *DataFrame* API. It returns a new column with the predicted labels and corresponding scores, given by the *pipeline* execution.

We then split the returned column into three separate columns corresponding to its components.

In [0]:
sdf_test = sdf_test.select('id', 'text', 'label', predict('text').alias('predictions'))

sdf_test = sdf_test.withColumns({'predicted_label': sdf_test['predictions'].getItem('label'),
                                 'score': sdf_test['predictions'].getItem('score')}).drop('predictions')

Display the testing data with the predicted values.

In [0]:
start_time = time.time()
display(sdf_test.select(sdf_test.columns).limit(10))
end_time = time.time()

print('\nTotal Duration: %f' % (round(end_time-start_time, 3)))

id,text,label,predicted_label,score
51539607677,"the ethos of the chelsea hotel may shape hawke's artistic aspirations , but he hasn't yet coordinated his own dv poetry with the beat he hears in his soul .",0,0,0.9518424
51539607570,a preposterously melodramatic paean to gang-member teens in brooklyn circa 1958 .,0,0,0.98857343
51539607658,"director roger kumble offers just enough sweet and traditional romantic comedy to counter the crudity . and there's the inimitable diaz , holding it all together .",1,1,0.99498653
51539607668,weighty and ponderous but every bit as filling as the treat of the title .,1,1,0.99423236
51539607653,a reworking of die hard and cliffhanger but it's nowhere near as exciting as either .,0,0,0.9904231
51539607640,"if i could have looked into my future and saw how bad this movie was , i would go back and choose to skip it . fortunately , you still have that option .",0,0,0.92581624
51539607656,a small gem from belgium .,1,1,0.98947996
51539607565,no surprises .,0,1,0.52954006
51539607669,the wild thornberrys movie is pleasant enough and the message of our close ties with animals can certainly not be emphasized enough .,1,1,0.9916142
51539607638,you don't know whether to admire the film's stately nature and call it classicism or be exasperated by a noticeable lack of pace . or both .,1,0,0.934046



Total Duration: 24.730000
