# Distributed Dask (Phase 5)

#### Input: 
   Models trained in stage 5.
#### Output:
   Distributed and cached dataframes that can scale upto cluster.
#### Algorithm:
   a) Read pandas dataframes and convert them to dask dataframes. <br>
   b) Persist the dataframes into distributed memory.<br>
   c) Using joblib to cache tfidfVectorizer, SVD and doc2vec<br>

In [1]:
from dask.distributed import Client
import dask.array as da
import dask.dataframe as dd
import joblib
import time
import gc
from gensim.models import Doc2Vec
from dask_ml.cluster import KMeans

In [2]:
client = Client()

In [3]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:37417  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 16.69 GB


In [4]:
def convert_to_distributed(path, partitions, choice=None):
    feature_matrix = joblib.load(path)
    if choice != None:
        feature_matrix = feature_matrix.iloc[1:]
    feature_matrix.reset_index(inplace=True)
    dask_matrix = dd.from_pandas(feature_matrix, npartitions=partitions)
    del feature_matrix
    gc.collect()
    return dask_matrix

In [5]:
svd_feature_matrix = convert_to_distributed("./model/lsa_embeddings.pkl", 4)

In [6]:
doc2vec_feature_matrix = convert_to_distributed("./model/doc2vec_embeddings.pkl", 4)

In [7]:
svd_feature_matrix = svd_feature_matrix.drop("files", axis=1)
doc2vec_feature_matrix = doc2vec_feature_matrix.drop("files", axis=1)

In [8]:
df = convert_to_distributed("./model/dataset.pkl", 4, 1)
df = df.drop(labels=["level_0", "index"], axis=1)
df = df.drop("keywords", axis=1)
df["weights"] = df["weights"].astype(float)

In [9]:
df.head()

Unnamed: 0,files,weights
0,User Application for Shop Floor Automation usi...,165.796429
1,User Application for Shop Floor Automation usi...,146.191667
2,User Application for Shop Floor Automation usi...,129.714379
3,User Application for Shop Floor Automation usi...,113.209091
4,User Application for Shop Floor Automation usi...,98.419441


In [10]:
svd_feature_matrix = svd_feature_matrix.persist()
doc2vec_feature_matrix = doc2vec_feature_matrix.persist()
df = df.persist()

  (                                                  ... 73705b671f691')
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


In [11]:
%%time
cluster = KMeans(n_clusters=4, random_state=89)
cluster.fit(doc2vec_feature_matrix)

CPU times: user 3.84 s, sys: 1.74 s, total: 5.58 s
Wall time: 13.6 s


KMeans(algorithm='full', copy_x=True, init='k-means||', init_max_iter=None,
       max_iter=300, n_clusters=4, n_jobs=1, oversampling_factor=2,
       precompute_distances='auto', random_state=89, tol=0.0001)

In [12]:
labels = dd.from_array(cluster.labels_)

In [13]:
doc2vec_feature_matrix["labels"] = labels
svd_feature_matrix["labels"] = labels
df["labels"] = labels

In [14]:
client.publish_dataset(svd_feature_matrix=svd_feature_matrix)
client.publish_dataset(doc2vec_feature_matrix=doc2vec_feature_matrix)
client.publish_dataset(df=df)
client.publish_dataset(cluster=cluster)

In [15]:
def cache_models():
    dv = Doc2Vec.load("./model/doc2vec_model")
    tf = joblib.load("./model/tfidf_model.pkl")
    svd = joblib.load("./model/svd_model.pkl")
    return dv, tf, svd

In [16]:
memory = joblib.Memory(location="./model/joblib")

In [17]:
costly_compute_cached = memory.cache(func=cache_models)

In [18]:
start = time.time()
dv, tf, svd = costly_compute_cached()
end = time.time()

# End of Phase 5