# Parallelizer

> Classes and functions to parallelize execution.

In [None]:
# | default_exp parallel.parallelizer


In [None]:
# | export

from dreamai_ray.imports import *
from dreamai_ray.utils import *
from dreamai_ray.mapper import *
from dreamai_ray.index.core import *
from ray.util import ActorPool


In [None]:
#| hide

%load_ext autoreload
%autoreload 2
%reload_ext autoreload

In [None]:
# | export


class PoolActor:
    def __init__(self, num_gpus=0.2, num_cpus=0.5) -> None:
        self.num_gpus = num_gpus
        self.num_cpus = num_cpus

    def get_stats(self):
        pass

    def reset_stats(self):
        pass

    def action(self, data=None):
        pass


@ray.remote
class DataParallelizer:
    def __init__(
        self,
        actor: PoolActor,
        iterator,
        num_actors=2,
        num_cpus=1,
        num_gpus=0.2,
        combiner=None,
        set_actor_options=False,
        verbose=True,
    ) -> None:
        if actor is None:
            raise Exception("Actor not provided to Data Parallelizer.")
        if iterator is None:
            raise Exception("Iterator not provided to Data Parallelizer.")
        if combiner is not None:
            self.combiner = combiner
        t1 = time()
        msg.info(
            f"CONSTRUCTOR CALLED: NUM_CPUS={num_cpus}, NUM_ACTORS={num_actors}, NUM_GPUS={num_gpus}.",
            spaced=True,
            show=verbose,
        )
        if set_actor_options:
            msg.info(
                f"SETTING ACTOR OPTIONS for {num_actors} ACTORS: NUM_CPUS={num_cpus}, NUM_GPUS={num_gpus}.",
                spaced=True,
                show=verbose,
            )
            self.actors_list = [
                actor.options(num_gpus=num_gpus, num_cpus=num_cpus).remote(
                    num_cpus=num_cpus, num_gpus=num_gpus
                )
                for _ in range(num_actors)
            ]

        else:
            msg.info(
                f"ACTOR PARAMS WITHOUT OPTIONS for {num_actors} ACTORS: NUM_CPUS={num_cpus}, NUM_GPUS={num_gpus}.",
                spaced=True,
                show=verbose,
            )
            self.actors_list = [
                actor.remote(num_cpus=num_cpus, num_gpus=num_gpus)
                for _ in range(num_actors)
            ]
        self.pool = ActorPool(self.actors_list)
        t2 = time()
        msg.good(
            f"ACTOR POOL CREATED in {t2-t1:.2f} seconds.", spaced=True, show=verbose
        )
        self.iterator = iterator
        self.verbose = verbose
        self.num_actors = num_actors

    def do_parallel(self, data_dict=None):
        try:
            data_dict["num_blocks"] = self.num_actors
            data_list = self.iterator(**data_dict)
            if data_list is None:
                raise Exception("Unable to get iterator.")
        except Exception as e:
            raise Exception(f"Get iterator failed with error {e}.")
        t1 = time()
        pool = self.pool
        data_list_out = list(
            pool.map(lambda processor, item: processor.action.remote(item), data_list)
        )
        t2 = time()
        msg.info(
            f"Time elapsed processing data = {t2-t1:.2f}.",
            spaced=True,
            show=self.verbose,
        )
        msg.info(
            f"Final data length = {len(data_list_out)}.", spaced=True, show=self.verbose
        )
        t1 = time()
        ret = self.combiner(data_list_out, data_dict)
        t2 = time()
        msg.info(
            f"Time elapsed combining data = {t2-t1:.2f}.",
            spaced=True,
            show=self.verbose,
        )
        return {"result": ret}


@ray.remote
class IndexCreatorPoolActor(PoolActor):
    def action(self, data):
        return create_index_(data)


@ray.remote
class SearchIndexPoolActor(PoolActor):
    def action(self, data):
        return search_index_(data)


In [None]:
# | hide
# | eval: false


if ray.is_initialized():
    ray.shutdown()
ray.init()

2023-06-25 19:58:24,513	INFO worker.py:1627 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.11
Ray version:,2.5.1
Dashboard:,http://127.0.0.1:8265


In [None]:
# | hide
# | eval: false


bucket = "gs://gcsfuse-talentnet-dev"

ems_folder = f"{bucket}/ems_1"
index_folder = f"{bucket}/indexes_1"
ems_dim = 768

In [None]:
# | hide
# | eval: false

bucket_del(index_folder)


[38;5;4mℹ Deleting gs://gcsfuse-talentnet-dev/indexes_1.[0m



Removing gs://gcsfuse-talentnet-dev/indexes_1/1.csv#1687705078056994...
Removing gs://gcsfuse-talentnet-dev/indexes_1/1_8.faiss#1687705078239592...
Removing gs://gcsfuse-talentnet-dev/indexes_1/2.csv#1687705078085950...
Removing gs://gcsfuse-talentnet-dev/indexes_1/2_8.faiss#1687705078252399...
/ [4/4 objects] 100% Done                                                       
Operation completed over 4 objects.                                              


In [None]:
# | hide
# | eval: false


index_pool_mapper = DataParallelizer.remote(
    IndexCreatorPoolActor,
    create_indexes_iter,
    num_actors=2,
    num_cpus=1,
    num_gpus=0,
    combiner=create_indexes_combine,
    verbose=True,
)

In [None]:
# | hide
# | eval: false

search_index_mapper = DataParallelizer.remote(
    SearchIndexPoolActor,
    search_indexes_iter,
    num_actors=2,
    num_cpus=1,
    num_gpus=0,
    combiner=search_indexes_combine,
    verbose=True,
)


In [None]:
# | hide
# | eval: false

data_dict = dict(
    ems_folder=ems_folder, index_folder=index_folder, index_dim=ems_dim, verbose=True
)
t1 = time()
res = ray.get(index_pool_mapper.do_parallel.remote(data_dict=data_dict))
t2 = time()
msg.good(f"Total Time Elapsed = {t2-t1:.2f}.", spaced=True, show=True)

[2m[36m(DataParallelizer pid=878723)[0m 
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ CONSTRUCTOR CALLED: NUM_CPUS=1, NUM_ACTORS=2, NUM_GPUS=0.[0m
[2m[36m(DataParallelizer pid=878723)[0m 
[2m[36m(DataParallelizer pid=878723)[0m 
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ ACTOR PARAMS WITHOUT OPTIONS for 2 ACTORS: NUM_CPUS=1,
[2m[36m(DataParallelizer pid=878723)[0m NUM_GPUS=0.[0m
[2m[36m(DataParallelizer pid=878723)[0m 
[2m[36m(DataParallelizer pid=878723)[0m 
[2m[36m(DataParallelizer pid=878723)[0m [38;5;2m✔ ACTOR POOL CREATED in 0.01 seconds.[0m
[2m[36m(DataParallelizer pid=878723)[0m 
[2m[36m(DataParallelizer pid=878723)[0m 
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Downloading gs://gcsfuse-talentnet-dev/ems_1 to
[2m[36m(DataParallelizer pid=878723)[0m /tmp/2d07f627258847a4/ems_1.[0m
[2m[36m(DataParallelizer pid=878723)[0m 


[2m[36m(DataParallelizer pid=878723)[0m Copying gs://gcsfuse-talentnet-dev/ems_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_1.json...
Copying gs://gcsfuse-talentnet-dev/ems_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_10.json...                           
Copying gs://gcsfuse-talentnet-dev/ems_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_11.json...                           
Copying gs://gcsfuse-talentnet-dev/ems_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_12.json...                           
Copying gs://gcsfuse-talentnet-dev/ems_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_13.json...                           
[2m[36m(DataParallelizer pid=878723)[0m / [0/16 files][    0.0 B/267.4 KiB]   0% Done                                   
[2m[36m(DataParallelizer pid=878723)[0m Copying gs://gcsfuse-talentnet-dev/ems_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_15.json...
Copying gs://gcsfuse-talentnet-dev/ems_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_16.json...     

[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Embeddings download time: 10.51 seconds.[0m
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Embeddings DF created of length: 16[0m
[2m[36m(DataParallelizer pid=878723)[0m [32m [repeated 11x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)[0m
[2m[36m(DataParallelizer pid=878724)[0m 
[2m[36m(DataParallelizer pid=878724)[0m 
[2m[36m(DataParallelizer pid=878724)[0m 


[2m[36m(DataParallelizer pid=878723)[0m CommandException: One or more URLs matched no objects.


[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Bucket Size: 0[0m
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Block Size: 8[0m
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Block Counter: 0[0m
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Setup time: 11.76 seconds.[0m
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Time elapsed processing data = 11.77.[0m
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Final data length = 2.[0m
[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Uploading /tmp/2d07f627258847a4/indexes_1 to
[2m[36m(IndexCreatorPoolActor pid=878850)[0m [38;5;4mℹ DF BATCH SIZE: 8[0m


[2m[36m(DataParallelizer pid=878723)[0m Copying file:///tmp/2d07f627258847a4/indexes_1/2_8.faiss [Content-Type=application/octet-stream]...
Copying file:///tmp/2d07f627258847a4/indexes_1/1_8.faiss [Content-Type=application/octet-stream]...                        
Copying file:///tmp/2d07f627258847a4/indexes_1/1.csv [Content-Type=text/csv]... 0% Done                                    
[2m[36m(DataParallelizer pid=878723)[0m Copying file:///tmp/2d07f627258847a4/indexes_1/2.csv [Content-Type=text/csv]...
/ [0/4 files][    0.0 B/ 49.4 KiB]   0% Done                                    0% Done                                    
[2m[36m(DataParallelizer pid=878723)[0m / [1/4 files][ 49.4 KiB/ 49.4 KiB]  99% Done                                    
[2m[36m(DataParallelizer pid=878723)[0m / [2/4 files][ 49.4 KiB/ 49.4 KiB]  99% Done                                    
- [4/4 files][ 49.4 KiB/ 49.4 KiB] 100% Done                                    9% Done                           

[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Uploading /tmp/2d07f627258847a4/indexes_1 to


[2m[36m(DataParallelizer pid=878723)[0m CommandException: No URLs matched: /tmp/2d07f627258847a4/indexes_1/*
[2m[36m(DataParallelizer pid=878723)[0m CommandException: 1 file/object could not be transferred.



[38;5;2m✔ Total Time Elapsed = 18.73.[0m



In [None]:
# | hide
# | eval: false


qems = f"{ems_folder}/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_10.json"
data_dict = dict(ems=qems, index_folder=index_folder, k=5, verbose=True)
t1 = time()
res = ray.get(search_index_mapper.do_parallel.remote(data_dict=data_dict))
t2 = time()
msg.good(f"Total Time Elapsed = {t2-t1:.2f}.", spaced=True, show=True)

res = res['result']
print(f'\n\nFinal Results:\n\tDistances: {res["distances"]}\n\tIDs: {res["ids"]}')
print("\tMeta Data:")
for m in res["meta_data"]:
    print(f"\t\t{m}")


[2m[36m(DataParallelizer pid=878723)[0m [38;5;4mℹ Time elapsed combining data = 2.83.[0m
[2m[36m(DataParallelizer pid=878724)[0m [38;5;4mℹ Cached Index Folder:
[2m[36m(DataParallelizer pid=878724)[0m [38;5;4mℹ Downloading
[2m[36m(DataParallelizer pid=878724)[0m to /tmp/732ba9ce3e064e24.[0m


[2m[36m(DataParallelizer pid=878724)[0m Copying gs://gcsfuse-talentnet-dev/ems_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_10.json...
[2m[36m(DataParallelizer pid=878724)[0m / [0/1 files][    0.0 B/ 16.7 KiB]   0% Done                                    



[38;5;2m✔ Total Time Elapsed = 2.01.[0m



Final Results:
	Distances: [[0.0, 0.9240111708641052, 1.0372934341430664, 1.101623296737671, 1.1049132347106934]]
	IDs: [[9, 11, 8, 13, 14]]
	Meta Data:
		{'embedding': '/tmp/e324b71dd9114694/ems_1_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_10.json'}
		{'embedding': '/tmp/e324b71dd9114694/ems_1_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_12.json'}
		{'embedding': '/tmp/e324b71dd9114694/ems_1_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_9.json'}
		{'embedding': '/tmp/e324b71dd9114694/ems_1_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_14.json'}
		{'embedding': '/tmp/e324b71dd9114694/ems_1_1/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_15.json'}


[2m[36m(DataParallelizer pid=878724)[0m Operation completed over 1 objects/16.7 KiB.                                     


In [None]:
# # | hide
# # | eval: false


# qems = f"{ems_folder}/resumes-4e2cdbeb-1e20-45ff-bded-a0a510350167_10.json"
# data_dict = dict(ems=qems, index_folder=index_folder, k=5, verbose=True)
# search_iter = search_indexes_iter(data_dict=data_dict)
# res = [search_index_(d) for d in search_iter]
# res = combine_searches(res, data_dict)

# print(f'\n\nFinal Results:\n\tDistances: {res["distances"]}\n\tIDs: {res["ids"]}')
# print("\tMeta Data:")
# for m in res["meta_data"]:
#     print(f"\t\t{m}")


In [None]:
# # | hide
# # | eval: false

# bucket = "gs://gcsfuse-talentnet-dev"

# ems_folder = f"{bucket}/ems_1"
# index_folder = f"{bucket}/indexes_1"
# ems_dim = 768

# bucket_del(index_folder)

# data_dict = dict(
#     ems_folder=ems_folder,
#     index_folder=index_folder,
#     index_dim=ems_dim,
#     num_actors=4,
#     verbose=True,
# )

# t1 = time()

# indexes_iter = create_indexes_iter(data_dict=data_dict)
# res = [create_index_(d) for d in indexes_iter]
# indexes_up(res)

# t2 = time()
# msg.good(f"Total Time Elapsed = {t2-t1:.2f}.", spaced=True, show=True)


In [None]:
# | hide

import nbdev

nbdev.nbdev_export()

[2m[36m(DataParallelizer pid=878724)[0m [38;5;4mℹ Adding Result: [[0.        0.9240112 1.0372934 1.1016233 1.1049132]],
[2m[36m(DataParallelizer pid=878724)[0m [[ 9 11  8 13 14]][0m
[2m[36m(DataParallelizer pid=878724)[0m [38;5;2m✔ Added Result: [[0.        0.9240112 1.0372934 1.1016233 1.1049132]],
[2m[36m(DataParallelizer pid=878724)[0m [[ 9 11  8 13 14]][0m
[2m[36m(DataParallelizer pid=878724)[0m [32m [repeated 36x across cluster][0m
[2m[36m(SearchIndexPoolActor pid=878948)[0m [38;5;4mℹ Index Col:
[2m[36m(SearchIndexPoolActor pid=878948)[0m [38;5;4mℹ Index Size: 16[0m
[2m[36m(SearchIndexPoolActor pid=878948)[0m [38;5;4mℹ Ems Shape: (1, 768)[0m
[2m[36m(SearchIndexPoolActor pid=878948)[0m [38;5;2m✔ IDs: [[ 9 11  8 13 14]], Distances: [[0.        0.9240112 1.0372934
