In [1]:
import cupy as cp
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from cuml.dask.common import to_sparse_dask_array
from cuml.dask.naive_bayes import MultinomialNB
import dask
from cuml.dask.feature_extraction.text import TfidfTransformer


from cuml.feature_extraction.text import TfidfTransformer as CumlTfidfTransformer
from cuml.feature_extraction.text import HashingVectorizer as CumlHashVect

import os
import cudf
import dask_cudf
from cudf import Series
import cupy as cp
from distributed import wait
import numpy as np
from scipy.sparse import random

In [2]:
cluster = LocalCUDACluster()
client = Client(cluster)

In [3]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:45039  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 404.32 GB


# Read + Filter Data

In [4]:
path='/nvme/0/vjawa/string_experiments/tweets/*.CSV'
df = dask_cudf.read_csv(path)
df=df[df.lang=='en'].persist()
_ = wait(df)

In [5]:
df['text'].head(10)

2     “People are just storing up. They are staying ...
6     .@PatriceHarrisMD spoke with @YahooFinance abo...
7     First medical team aiding #Wuhan in fight agai...
9     .@KathyGriffin: @realDonaldTrump Is 'Lying' Ab...
14    #CoronaUpdate | Johns Hopkins University has s...
17    Singapore donates 40,000 test kits to the Phil...
19    It's been a remarkable week for bold policy an...
26    #Pentagon activates special protocols to fight...
28    Lockdowns, drones: Will things ever go back to...
30    Does the #COVID19 pandemic have you feeling ou...
Name: text, dtype: object

# Actual TF-IDF RUN (Hashing Vec+Tranformer)

In [6]:
%%time
vec = CumlHashVect(stop_words = 'english',preprocessor= lambda ser:ser.str.lower())
mutli_gpu_transformer = TfidfTransformer()

meta = dask.array.from_array(cp.sparse.csr_matrix(cp.zeros(1, dtype=cp.float32)))
X = df["text"].map_partitions(vec.fit_transform, meta=meta)
X = X.astype(np.float32).persist()
X.compute_chunk_sizes()
wait(X)
X_transormed = mutli_gpu_transformer.fit_transform(X).persist()
wait(X_transormed)
X_transormed.compute_chunk_sizes()

[I] [13:35:16.399707] [Delayed('_merge_stats_to_model-9f97e3b4-90b8-4cc1-93b6-9e2ebe9d4cbb')]
[I] [13:35:16.401723] [Delayed('_merge_stats_to_model-ac7414d0-a80b-4380-9d85-8db21f8cb68f')]
[I] [13:35:16.403639] [Delayed('_merge_stats_to_model-3bda724f-4a29-421b-b123-de4516705abf')]
[I] [13:35:16.404669] [Delayed('_merge_stats_to_model-cb173d21-0982-4196-a024-6d831ccf6468')]
[I] [13:35:16.450830] [<Future: finished, type: cuml.TfidfTransformer, key: _merge_stats_to_model-9f97e3b4-90b8-4cc1-93b6-9e2ebe9d4cbb>]
[I] [13:35:16.450894] [<Future: finished, type: cuml.TfidfTransformer, key: _merge_stats_to_model-ac7414d0-a80b-4380-9d85-8db21f8cb68f>]
[I] [13:35:16.451141] [<Future: finished, type: cuml.TfidfTransformer, key: _merge_stats_to_model-3bda724f-4a29-421b-b123-de4516705abf>]
[I] [13:35:16.451249] [<Future: finished, type: cuml.TfidfTransformer, key: _merge_stats_to_model-cb173d21-0982-4196-a024-6d831ccf6468>]
[I] [13:35:16.453126] [Delayed('_merge_stats_to_model-50977310-0a4b-446f-9f3

Unnamed: 0,Array,Chunk
Bytes,20.25 TB,1.59 TB
Shape,"(4827372, 1048576)","(378267, 1048576)"
Count,18 Tasks,18 Chunks
Type,float32,cupy.ndarray
"Array Chunk Bytes 20.25 TB 1.59 TB Shape (4827372, 1048576) (378267, 1048576) Count 18 Tasks 18 Chunks Type float32 cupy.ndarray",1048576  4827372,

Unnamed: 0,Array,Chunk
Bytes,20.25 TB,1.59 TB
Shape,"(4827372, 1048576)","(378267, 1048576)"
Count,18 Tasks,18 Chunks
Type,float32,cupy.ndarray


# Using these for document Search

In [7]:
mat = random(1,1)
o =  X_transormed.map_blocks(lambda x:x.get(),meta=mat)
output = o.compute()
cudf_df = df[['text','status_id']].compute()
cupy_mat = cp.sparse.csr_matrix(output)

In [8]:
from cuml.common.sparsefuncs import csr_row_normalize_l2

def custom_vectorizer(X):
    hashed_v =  CumlHashVect(stop_words='english')
    tf_idf_v = mutli_gpu_transformer.internal_model.result()
    return tf_idf_v.transform(hashed_v.transform(X))

def efficient_csr_cosine_similarity(query, tfidf_matrix, matrix_normalized=False):
    query = csr_row_normalize_l2(query, inplace=False)
    if not matrix_normalized:
        tfidf_matrix = csr_row_normalize_l2(tfidf_matrix, inplace=False)
    
    return tfidf_matrix.dot(query.T)


def document_search(text_df, query, tfidf_matrix, top_n=3):
    query_vec = custom_vectorizer(Series([query]))
    similarities = efficient_csr_cosine_similarity(query_vec, tfidf_matrix, matrix_normalized=True)
    similarities = similarities.todense().reshape(-1)
    best_idx = similarities.argsort()[-top_n:][::-1]
    
    pp = cudf.DataFrame({
        'text': text_df['text'].iloc[best_idx],
        'similarity': similarities[best_idx]
    })
    return pp

In [9]:
document_search(cudf_df[['text']], 'computer science and NLP', cupy_mat)


Unnamed: 0,text,similarity
207793,Computer Science has been diverging as CS + cr...,0.398849
237113,Will the Coronavirus bring computer and medica...,0.359052
474055,@IBMWatson is going to save us in #coronavirus...,0.358686


In [11]:
document_search(cudf_df[['text']], 'nvidia-gpu', cupy_mat)

Unnamed: 0,text,similarity
149315,"Nvidia Joins War Against COVID-19 With AI, GPU...",0.439025
163154,Nvidia RTX 3000 GPU delayed due to CoronaVirus...,0.425495
104222,Over 80% of physicians expect 2nd #COVID19 out...,0.394341
