## Distributed batch_inference with dask 

In [3]:
import sys
!{sys.executable} -m pip install -r requirements.txt --quiet



In [4]:
import warnings
pd.options.display.max_rows = 999
warnings.filterwarnings('ignore')
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

import gcsfs
import s3fs
import pickle
from transformers import DistilBertTokenizer, TFDistilBertForSequenceClassification
import tensorflow as tf
from tensorflow.keras import activations, optimizers, losses
from scheduler_setup import loaded_models
tf.get_logger().setLevel('ERROR')

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


ModuleNotFoundError: No module named 'tensorflow'

## start dask workers 

In [7]:
from hyperplane import notebook_common as nc

client, cluster = nc.initialize_cluster(
    nprocs = 3,
    nthreads = 5,
    ram_gb_per_proc = 4,
    cores_per_worker = 15,
    num_workers = 2
)

ðŸ‘‰ Hyperplane: selecting worker node pool
ðŸ‘‰ Hyperplane: selecting scheduler node pool
Creating scheduler pod on cluster. This may take some time.
ðŸ‘‰ Hyperplane: spinning up a dask cluster with a scheduler as a standalone container.
ðŸ‘‰ Hyperplane: In a few minutes you'll be able to access the dashboard at https://ds.hyperplane.dev/dask-cluster-c76da13e-b7fa-4d2f-a620-895721e9c986/status
ðŸ‘‰ Hyperplane: to get logs from all workers, do `cluster.get_logs()`


In [9]:
%%time
from dask.distributed import PipInstall
plugin = PipInstall(packages=["s3fs"], pip_options=["--upgrade"])
client.register_worker_plugin(plugin)

CPU times: user 9.14 ms, sys: 2.34 ms, total: 11.5 ms
Wall time: 8.55 s


{'tcp://10.1.113.16:35955': {'status': 'OK'},
 'tcp://10.1.113.16:37409': {'status': 'OK'},
 'tcp://10.1.113.16:41277': {'status': 'OK'},
 'tcp://10.1.114.16:35745': {'status': 'OK'},
 'tcp://10.1.114.16:41753': {'status': 'OK'},
 'tcp://10.1.114.16:45565': {'status': 'OK'}}

In [10]:
client.upload_file('scheduler_setup.py')

{'tcp://10.1.113.16:35955': {'status': 'OK'},
 'tcp://10.1.113.16:37409': {'status': 'OK'},
 'tcp://10.1.113.16:41277': {'status': 'OK'},
 'tcp://10.1.114.16:35745': {'status': 'OK'},
 'tcp://10.1.114.16:41753': {'status': 'OK'},
 'tcp://10.1.114.16:45565': {'status': 'OK'}}

In [1]:
import dask.dataframe as dd
df = dd.read_parquet(
#     "s3://d2v-tmp/demo/bach_inference/data/yelp_review_dask.parquet"
    "gs://pipeline_data/ray_data/yelp_review_dask.parquet"
)
print(df.npartitions)
df.head(2)

100


Unnamed: 0,review_id,user_id,business_id,stars,useful,funny,cool,text,date
0,lWC-xP3rd6obsecCYsGZRg,ak0TdVmGKo4pwqdJSTLwWw,buF9druCkbuXLX526sGELQ,4,3,1,1,Apparently Prides Osteria had a rough summer a...,1412998442000000000
1,8bFej1QE5LXp4O05qjGqXA,YoVfDbnISlW0f7abNQACIg,RA4V8pr014UyUbDvI-LW2A,4,1,0,0,This store is pretty good. Not as great as Wal...,1435955905000000000


In [2]:
# df.head(2).text.iloc[0]

"Apparently Prides Osteria had a rough summer as evidenced by the almost empty dining room at 6:30 on a Friday night. However new blood in the kitchen seems to have revitalized the food from other customers recent visits. Waitstaff was warm but unobtrusive. By 8 pm or so when we left the bar was full and the dining room was much more lively than it had been. Perhaps Beverly residents prefer a later seating. \n\nAfter reading the mixed reviews of late I was a little tentative over our choice but luckily there was nothing to worry about in the food department. We started with the fried dough, burrata and prosciutto which were all lovely. Then although they don't offer half portions of pasta we each ordered the entree size and split them. We chose the tagliatelle bolognese and a four cheese filled pasta in a creamy sauce with bacon, asparagus and grana frita. Both were very good. We split a secondi which was the special Berkshire pork secreto, which was described as a pork skirt steak wit

In [12]:
def clean_text(df):
    df['text'] = df.text.replace(r'\r+|\n+|\t+','', regex=True)
    df['text'] = df.text.str.lower()
    return df

df = df.map_partitions(clean_text)
df['text'] = df.text.astype(str)
df.dtypes

review_id      object
user_id        object
business_id    object
stars           int64
useful          int64
funny           int64
cool            int64
text           object
date            int64
dtype: object

In [13]:
df = df.persist()

## Inference

In [None]:
def predict(df: pd.DataFrame, batch_size : str) -> pd.DataFrame:
    import sys
#     sys.path.append('/root')
    from scheduler_setup import loaded_models
    model, model_name, max_len = loaded_models['model']
    
    tkzr = DistilBertTokenizer.from_pretrained(model_name)
    inputs = tkzr(df.text.tolist(),    
                  padding='max_length',
                  truncation=True, 
                  return_tensors='tf')

    dataset = tf.data.Dataset.from_tensor_slices((inputs['input_ids'],inputs['attention_mask']))
    dataset = dataset.batch(batch_size)

    import numpy as np
    predictions = []
    for i, (token_ids, masks) in enumerate(dataset):
        pred = model(token_ids, attention_mask=masks)
        labels = np.argmax(tf.nn.softmax(pred.logits, axis=0).numpy(), axis = 1)
        predictions.append(labels)
    predictions = np.hstack(predictions)
    df['pred'] = predictions
    return df

In [15]:
%%time
## test out with one small df 
df_local = df.head(5)
print(df_local.shape)
predict(df_local, batch_size=8)

(5, 9)
CPU times: user 9.07 s, sys: 1.82 s, total: 10.9 s
Wall time: 2.69 s


Unnamed: 0,review_id,user_id,business_id,stars,useful,funny,cool,text,date,pred
0,lWC-xP3rd6obsecCYsGZRg,ak0TdVmGKo4pwqdJSTLwWw,buF9druCkbuXLX526sGELQ,4,3,1,1,apparently prides osteria had a rough summer a...,1412998442000000000,0
1,8bFej1QE5LXp4O05qjGqXA,YoVfDbnISlW0f7abNQACIg,RA4V8pr014UyUbDvI-LW2A,4,1,0,0,this store is pretty good. not as great as wal...,1435955905000000000,1
2,NDhkzczKjLshODbqDoNLSg,eC5evKn1TWDyHCyQAwguUw,_sS2LBIGNT5NQb6PD1Vtjw,5,0,0,0,i called wvm on the recommendation of a couple...,1369773486000000000,1
3,T5fAqjjFooT4V0OeZyuk1w,SFQ1jcnGguO0LYWnbbftAA,0AzLzHfOJgL7ROwhdww2ew,2,1,1,1,i've stayed at many marriott and renaissance m...,1262917755000000000,0
4,sjm_uUcQVxab_EeLCqsYLg,0kA0PAJ8QFMeveQWHFqz2A,8zehGz9jnxPqXtOc7KaJxA,4,0,0,0,the food is always great here. the service fro...,1311876301000000000,1


In [16]:
print(len(df))
df_sample = df.sample(0.001)
len(df_sample)

1000000


1000

In [17]:
from numpy import dtype
meta = df.dtypes.to_dict()
meta['pred'] = 'int'

df_result = df_sample.map_partitions(predict, batch_size=8 , meta = meta)

In [18]:
%%time
df_result_local = df_result.compute()
df_result_local.shape

CPU times: user 743 ms, sys: 19.1 ms, total: 762 ms
Wall time: 1min 40s


(1000, 10)

In [19]:
cluster.close()

#### with hyperplane dask distributed, inference on 1000 samples finished in 1min 42s
In comparison, a non-paralized version which has an estimated time of 8 min 46s, it's a 5.1x speed up, coresponds to the dask cluster of 6 workers. Given more workers and GPU, the speed up will be even more