# NLP tf-idf pipeline with cuML + Dask

In [None]:
%matplotlib inline
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
import dask_cudf
from cuml.feature_extraction.text import HashingVectorizer
from cuml.dask.feature_extraction.text import TfidfTransformer
import nltk
import cupy as cp
from tqdm import tqdm
import dask

In [None]:
# Import other utility functions for benhmarking purposes
from utils import SimpleTimer, ResultsLogger, scale_workers
from utils import visualize_data_cuml as visualize_data

## Setting up the Dask cuda cluster

In [None]:
# # Create a local CUDA cluster
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7", local_directory="/raid/anirband/dask", rmm_pool_size="30GB")
client = Client(cluster)

In [None]:
client

## Benchmarking Code

### Helper Functions

In [None]:
nltk.download('stopwords')
STOPWORDS = nltk.corpus.stopwords.words('english')
PUNCTUATIONS = ['!', '"', '#', '$', '%', '&', '(', ')', '*', '+', '-', '.',
                '/', '\\', ':', ';', '<', '=', '>', '?', '@', '[', ']', '^',
                '_', '`', '{', '|', '}', '\t','\n', "'", ",", '~' , '—']

def read_data(client, parquet_path, benchmark):
    data = dask_cudf.read_parquet(parquet_path, columns=["review_body"], row_groups_per_part=3)
    if benchmark:
        data = client.persist(data)
        wait(data)
        print(data.shape[0].compute())
    return data


def text_preprocessor(data, client, column_name, PUNCTUATIONS,
                      STOPWORDS, benchmark):
    data = data[data[column_name].notnull()]
    data[column_name] = (data[column_name]
                         .str.lower()
                         .str.replace_tokens(
                             PUNCTUATIONS, [" "]*len(PUNCTUATIONS))
                         .str.replace_tokens(STOPWORDS, "")
                         .str.normalize_spaces()
                         .str.strip())
    if benchmark:
        data = client.persist(data)
        wait(data)
        print(data.shape[0].compute())
    return data


def hashing_vectorizer(data, client, column_name, benchmark):
    vectorizer = HashingVectorizer(stop_words=None, preprocessor=None)
    # Meta is an empty dataframe matches the dtypes and columns of the output
    meta = dask.array.from_array(cp.sparse.csr_matrix(cp.zeros(1, dtype=cp.float32)))
    hashing_vectorized = data[column_name].map_partitions(vectorizer.fit_transform, meta=meta).astype(cp.float32)
    if benchmark:
        hashing_vectorized = client.persist(hashing_vectorized)
        wait(hashing_vectorized)
        hashing_vectorized.compute_chunk_sizes()
        print(hashing_vectorized.shape)
    return hashing_vectorized


def tfidf_transformer(data, client, benchmark):
    multi_gpu_transformer = TfidfTransformer(client=client)
    result = multi_gpu_transformer.fit_transform(data)
    if benchmark:
        result = client.persist(result)
        wait(result)
        result.compute_chunk_sizes()
        print(result.shape)
    return result

def tfidf_transformer_POC(data, client, benchmark):
    multi_gpu_transformer = TfidfTransformer()
    result = multi_gpu_transformer.fit_transform(data)
    if benchmark:
        result = client.persist(result)
        wait(result)
        result.compute_chunk_sizes()
        print(result.shape)
    return result


def execute_full_pipeline(n, i, client, parquet_path, worker_counts=[1],
                            result_path="./results.csv", benchmark=True):
    sample_record = {"overall": 0, "data_read": 0, "hashing_vectorizer": 0,
                    "tfidf_transformer": 0, "data_preprocessing": 0, "nrows": 0}
    # client.restart()
    with SimpleTimer() as timer:
        data = read_data(client, parquet_path, benchmark)
    sample_record["data_read"] = timer.elapsed/1e9

    with SimpleTimer() as timer:
        data = text_preprocessor(data, client, "review_body", PUNCTUATIONS, STOPWORDS, benchmark)
    sample_record["data_preprocessing"] = timer.elapsed/1e9

    with SimpleTimer() as timer:
        hashing_vectorized = hashing_vectorizer(data, client, "review_body", benchmark)
    sample_record["hashing_vectorizer"] = timer.elapsed/1e9

    with SimpleTimer() as timer:
        result = tfidf_transformer(hashing_vectorized, client, benchmark=True)
    sample_record["tfidf_transformer"] = timer.elapsed/1e9

    print(f"Workers:{n}, Sample Run:{i}, Finished loading data in {sample_record['data_read']}s")
    print(f"Workers:{n}, Sample Run:{i}, Finished preprocessing data in {sample_record['data_preprocessing']}s")
    print(f"Workers:{n}, Sample Run:{i}, Finished fitting HashVectorizer in {sample_record['hashing_vectorizer']}s")
    print(f"Workers:{n}, Sample Run:{i}, Finished fitting IDF Transformer in {sample_record['tfidf_transformer']}s")

    return data, result, sample_record

In [None]:
def performance_numbers(client, parquet_path, worker_counts=[1], samples=1, result_path="./results.csv", benchmark=True):
    """
    Main function to perform the performance sweep
    """
    results_logger = ResultsLogger(result_path)
    for n in worker_counts: 
        scale_workers(client, n)
        
        for i in tqdm(range(samples)): 
            with SimpleTimer() as overalltimer:
                data, result, sample_record = execute_full_pipeline(n, i, client, parquet_path, worker_counts=[1], result_path=result_path, benchmark=benchmark)
            sample_record["overall"]=overalltimer.elapsed/1e9
            sample_record["nrows"]=data.shape[0].compute()
            sample_record["n_workers"]=n
            sample_record["sample_run"]=i
            print(f"Workers:{n}, Sample Run:{i}, Finished executing full pipeline in {overalltimer.elapsed/1e9}s")
            results_logger.log(sample_record)
    results_logger.write()

## Benchmark latency by materializing the intermediate dataframe(s) in every stage

In [None]:
#parquet_path = 's3://amazon-reviews-pds/parquet/product_category=Camera/*.parquet'
dataset = "Books"
#parquet_path = f'./data/product_category={dataset}'
parquet_path = f"/raid/amazon_reviews_dataset/product_category={dataset}"
samples = 5
worker_counts = [8] # [2,4,6,8]
result_path = f"./results/result_poc_nlp_dask_{dataset}_persist.csv"

In [None]:
%%time
performance_numbers(client, parquet_path=parquet_path, worker_counts=worker_counts, samples=samples, result_path=result_path, benchmark=True)

In [None]:
data, melt_data = visualize_data(result_path)        

In [None]:
groupby = data.groupby("n_workers").agg(['mean', 'std', 'count'])

In [None]:
groupby

## Benchmark latency without materializing the intermediate dataframe(s) except for the last stage

In [None]:
client.restart() # restart the client before performing the following set of experiments.

In [None]:
#parquet_path = 's3://amazon-reviews-pds/parquet/product_category=Camera/*.parquet'
dataset = "Books"
#parquet_path = f'./data/product_category={dataset}'
parquet_path = f"/raid/amazon_reviews_dataset/product_category={dataset}"
samples = 5
worker_counts = [8] # [2,4,6,8]
result_path = f"./results/result_poc_nlp_dask_{dataset}_nonpersist.csv"

In [None]:
%%time
performance_numbers(client, parquet_path=parquet_path, worker_counts=worker_counts, samples=samples, result_path=result_path, benchmark=False)

In [None]:
data, melt_data = visualize_data(result_path)        

In [None]:
groupby = data.groupby("n_workers").agg(['mean', 'std', 'count'])

In [None]:
groupby

In [None]:
client.close()
cluster.close()