 <font size="6"> <center> **[Open in Github](https://github.com/danielalcalde/apalis/blob/master/examples.ipynb)**</center></font> 

# Examples

In [1]:
import apalis
import time

## Basics

### Single Handler

In [2]:
class A:
    def __init__(self, y):
        self.y = y
    
    def expensive(self, x=1):
        cum = 0
        for i in range(10 ** 5):
            cum += i
        return x * self.y
    
    def fast(self, x=1):
        cum = 0
        for i in range(x):
            cum += i
        return 0
    
    def cheap(self, x=1):
        return x * self.y

To send an object to another process.

In [3]:
obj = apalis.Handler(A(2))

In [4]:
%%time
token = obj.expensive(5)
print(token()) # Calling the token yields the result of the operation

10
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 14.4 ms


### Several Handlers

In [5]:
objsH = [apalis.Handler(A(_)) for _ in range(16)]

In [6]:
%%time
tokens = [obj.expensive(5) for obj in objsH]
print(apalis.get(tokens)) # Gets the results of the operations. 

[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75]
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 25.2 ms


This is equivalen to:

In [7]:
print([token() for token in tokens])

[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75]


## Extra Features

### Set Attribute

In [8]:
obj = apalis.Handler(A(2))
obj.expensive(1)()

2

In [9]:
obj.y = 5
obj.expensive(1)()

5

Also new attributes can be set.

In [10]:
obj.a = A(1)

In [11]:
obj.a.expensive()()

1

An attribute from an attribute can be set.

In [12]:
obj.a.y = 3
obj.a.expensive()()

3

### Get Attribute

Use the get() function to get the value of an attribute

In [13]:
obj.y.get()()

5

### Initialize the object directly in the child process

In [14]:
@apalis.RemoteClass
class G:
    def __init__(self, y):
        # Expensive initialization
        self.y = 0
        for i in range(y):
            self.y += i

In [15]:
g = G(5)
g

<apalis.core.Handler at 0x7f2e2f2e82b0>

## The Group Handler

With the GroupHandler a list of objects can be parallelized on a given number of processes.

In [16]:
objs = [A(_) for _ in range(64)]
objsGH = apalis.GroupHandler(objs, threads=16)

The ideal way to use the GroupHandler is to first obtain all the tasks that the objs need to perform and then run them using run(). Executing a function will now return a task.

In [17]:
objsGH[2].cheap(5)

{'i': 2, 'name': 'cheap', 'args': (5,), 'kwargs': {}, 'mode': 'run'}

All the tasks can be executed by multiple_run().

In [18]:
tasks = [objsGH[i].cheap(5) for i in range(2)]
token = objsGH.multiple_run(tasks)
print(tasks)
token()

[{'i': 0, 'name': 'cheap', 'args': (5,), 'kwargs': {}, 'mode': 'run', 'task_id': 0, 'item_number': 0}, {'i': 1, 'name': 'cheap', 'args': (5,), 'kwargs': {}, 'mode': 'run', 'task_id': 0, 'item_number': 1}]


[0, 5]

Let us time the function:

In [19]:
%%timeit
objsGH.multiple_run([obj.cheap(5) for obj in objsGH])()

455 µs ± 14.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


If more speed is needed the task can be created manually:

In [20]:
%%timeit
objsGH.multiple_run([{'i': i, 'name': "cheap", 'args': (5,), "mode":"run"} for i, obj in enumerate(objs)])()

412 µs ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


Comparing the speed with the normal Handler one can see that the GroupHandler is faster:

In [21]:
objsH = [apalis.Handler(obj) for i, obj in enumerate(objs)]

In [22]:
%%timeit
apalis.get([obj.cheap(5) for obj in objsH])

1.79 ms ± 53.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


Note that several tasks can also be send to one Handler with the multiple_run syntax:

In [23]:
objsH[0].multiple_run([{'name': "cheap", 'args': (i,), "mode":"run"} for i in range(10)])()

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

or

### run

A bit more speed can be achieved by not generating a token and directly waiting for the respons of the child processes:

In [24]:
%%timeit
objsGH.run([{'i': i, 'name': "cheap", 'args': (5,), "mode":"run"} for i, obj in enumerate(objs)])

358 µs ± 9.37 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


same syntax is also valid for the Handler:

In [25]:
objsH[0].run({'name': "cheap", 'args': (1,), "mode":"run"})

0

## More Functionality

### single_run

The function *single_run* will execute just one task. It works for both the GroupHandler and Handler.

In [26]:
objsGH.single_run({'i':1, 'name': "cheap", 'args': (1,), "mode": "run"})()

1

In [27]:
objsH[1].single_run({'name': "cheap", 'args': (1,), "mode": "run"})()

1

It is a bit faster than multiple run:

In [28]:
%%timeit
objsGH.single_run({'i':1, 'name': "cheap", 'args': (1,), "mode": "run"})()

43.3 µs ± 1.33 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [29]:
%%timeit
objsGH.multiple_run([{'i':1, 'name': "cheap", 'args': (1,), "mode": "run"}])()

44.5 µs ± 955 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)


### eval, exec, copy, get_attr, set_attr

Evaluate the code. Equivalent to eval().

In [30]:
objsH[1].single_run({'code': "self.cheap(1) * 5", "mode": "eval"})()

5

Execute the code. Equivalent to exec().

In [31]:
objsH[1].single_run({'code': "self.a = self.cheap(1) * 5", "mode": "exec"})()
objsH[1].a.get()()

5

Returns a copy of the object. Before the copy is returned the code is executed. This is usefull when your object has unpickable objects as attributes as you can delete them.

In [32]:
class B:
    def __init__(self):
        self.lambda_function = lambda x: x
        
obj = apalis.Handler(B())

In [33]:
b_copy = obj.single_run({'code': "del self_copy.lambda_function", "mode": "copy"})()

Set an attribute. Equivalent to obj.x = 5.

In [34]:
objsH[1].single_run({'name': "y", "value": 5, "mode": "set_attr"})()

Get an attribute. Equivalent to obj.x.get().

In [35]:
objsH[1].single_run({'name': "y", "mode": "get_attr"})()

5

### Initialize directly in the child process (Group Handler)

The first argument is the class that will be sent to the child process. The second a list of parameters that will be passed to instantiate the class. They should be in the format [args, kwargs].

In [36]:
classes = [A for i in range(16)]
params = [[(i,), {}] for i in range(16)]
gh = apalis.GroupHandler(classes, params=params)
gh

<apalis.core.GroupHandler at 0x7f2e2f2a3a30>

In [37]:
gh.single_run(gh[2].cheap(1))()

2

In [38]:
g

<apalis.core.Handler at 0x7f2e2f2e82b0>

### CPU Affinity

For fast work loads ($t<1$ms) setting the CPU affinity of the processes can make things faster.

In [39]:
objs = [A(_) for _ in range(16)]

objsGH = apalis.GroupHandler(objs, threads=16, affinity=False)
objsGH_affinity = apalis.GroupHandler(objs, threads=16, affinity=True)

In [40]:
%%timeit
objsGH.multiple_run([{'i': i, 'name': "fast", 'args': (10 ** 4,), "mode":"run"} for i in range(16)])()

1.61 ms ± 25.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [41]:
%%timeit
objsGH_affinity.multiple_run([{'i': i, 'name': "fast", 'args': (10**4,), "mode":"run"} for i in range(16)])()

1.09 ms ± 33.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


### Sending Large Objects with Plasma

In [42]:
import numpy as np

Creates a plasma object store.

In [43]:
apalis.init_plasma(mem=2000)

/home/danielalcalde/anaconda3/envs/python3.8/bin/plasma_store -m 2000000000 -s /tmp/plasma_auBfeZvrVU


<pyarrow._plasma.PlasmaClient at 0x7f2e2d659670>

Store in plasma:

In [44]:
oid = apalis.put(np.zeros(10))
oid

ObjectID(92d6aacd66fa94adb1f6bf85be06d54be721a670)

Retrive it from plasma

In [45]:
apalis.plasma_get(oid)

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

Let's test it out.

In [46]:
class C:
    def __init__(self):
        self.v = np.random.randn(3000)
    
    def multi(self, m):
        return m.dot(self.v)

In [47]:
objs = [apalis.Handler(C()) for _ in range(16)]
objs2 = [C() for _ in range(16)]

Without using Plasma:

In [48]:
%%timeit
m = np.random.randn(3000, 3000)
apalis.get([obj.multi(m) for obj in objs])

3.72 s ± 74 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


With Plasma:

In [49]:
%%timeit
m = np.random.randn(3000, 3000)
m_id = apalis.put(m)
apalis.get([obj.multi(m_id) for obj in objs])

691 ms ± 32.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


With shared arrays:

In [50]:
%%timeit
m = apalis.SharedArray(np.random.randn(3000, 3000))
apalis.get([obj.multi(m) for obj in objs])
m.unlink()

732 ms ± 43.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Shared Arrays (Experimental)

In [51]:
class C:
    def __init__(self, x):
        self.x = x
    
    def multi(self, m):
        m[0] = self.x

In [52]:
obj = apalis.Handler(C(10))

Create mutable shared array:

In [53]:
a = apalis.SharedArray(np.zeros(10))
a

SharedArray([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

It gets modified in by multi. The modification gets shared:

In [54]:
obj.multi(a)()
a

SharedArray([10.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.])

To release the shared memory:

In [55]:
a.unlink()