In [1]:
%load_ext autoreload
%autoreload 2

import sys
import time
from functools import partial

# to get the ray dashboard, use `pip install 'ray[default]'`
import ray

from vflow import Vset, Vfunc, AsyncVfunc, init_args

In [2]:
# initialize ray
ray.init(num_cpus=2) # optionally might add _temp_dir='tmp'

2022-02-08 14:26:18,298	INFO services.py:1338 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.1.83',
 'raylet_ip_address': '192.168.1.83',
 'redis_address': '192.168.1.83:6379',
 'object_store_address': '/tmp/ray/session_2022-02-08_14-26-15_892908_257684/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2022-02-08_14-26-15_892908_257684/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2022-02-08_14-26-15_892908_257684',
 'metrics_export_port': 53843,
 'node_id': 'd6ba1be97c5d3422a9dc200916f6c6a14a154cb21a6d1cb122ae5cbd'}

In [3]:
def slow_fun(a, b=1):
    time.sleep(1)
    return a + b


# create 4 vfuncs using partials of slow_fun
# the 'vfunc' arg to Vfunc.__init__ must either have a .fit method or be callable
vfuncs = [Vfunc(f'fun{i}', partial(slow_fun, b=i)) for i in range(4)]

slow_set = Vset('slow_set', vfuncs)

# if there's only one arg, it comes wrapped in a list and I can't pass the result of init_args directly to .fit
args = init_args([1], ['a'])[0]

In [4]:
%%time

slow_set(args)

CPU times: user 204 ms, sys: 32.1 ms, total: 236 ms
Wall time: 4 s


{(a, slow_set_0): 1,
 (a, slow_set_1): 2,
 (a, slow_set_2): 3,
 (a, slow_set_3): 4,
 '__prev__': (<vflow.vset.Vset at 0x7f9dffbf7af0>, ('init',))}

In [5]:
# 1st option to use parallelism is to just use is_async=True when initializing VfuncSet
fast_set = Vset('fast_set', vfuncs, is_async=True)

In [6]:
%%time

fast_set(args)

CPU times: user 114 ms, sys: 25.5 ms, total: 140 ms
Wall time: 2.05 s


{(a, fast_set_0): 1,
 (a, fast_set_1): 2,
 (a, fast_set_2): 3,
 (a, fast_set_3): 4,
 '__prev__': (<vflow.vset.Vset at 0x7f9dffbf7580>, ('init',))}

In [7]:
# 2nd option is to use AsyncVfuncs directly
async_vfuncs = [AsyncVfunc(f'fun{i}', partial(slow_fun, b=i)) for i in range(4)]
fast_set = Vset('fast_set', async_vfuncs)

In [8]:
%%time

fast_set(args)

CPU times: user 95.8 ms, sys: 19.4 ms, total: 115 ms
Wall time: 2.01 s


{(a, fast_set_0): 1,
 (a, fast_set_1): 2,
 (a, fast_set_2): 3,
 (a, fast_set_3): 4,
 '__prev__': (<vflow.vset.Vset at 0x7f9dfdb525b0>, ('init',))}

In [9]:
ray.shutdown()