# Lithops multirpocessing API

### Ejemplo sencillo de multiprocessing

In [2]:
from multiprocessing import Pool

In [3]:
def f(x):
    return x*x


In [4]:
with Pool(5) as p:
    print(p.map(f, [1, 2, 3]))

[1, 4, 9]


 ### Ahora con Lithops...

In [5]:
from lithops.multiprocessing import Pool


In [6]:
with Pool(5) as p:
    print(p.map(f, [1, 2, 3]))

2022-10-01 16:07:33,246 [INFO] lithops.config -- Lithops v2.6.0
2022-10-01 16:07:33,380 [INFO] lithops.storage.backends.aws_s3.aws_s3 -- S3 client created - Region: eu-central-1
2022-10-01 16:07:33,844 [INFO] lithops.serverless.backends.aws_lambda.aws_lambda -- AWS Lambda client created - Region: eu-central-1
2022-10-01 16:07:33,846 [INFO] lithops.invokers -- ExecutorID 1a848e-0 | JobID M000 - Selected Runtime: gfinol/python3.10:3.0 - 1769MB
2022-10-01 16:07:33,931 [INFO] lithops.invokers -- ExecutorID 1a848e-0 | JobID M000 - Starting function invocation: cloud_process_wrapper() - Total: 3 activations
2022-10-01 16:07:33,934 [INFO] lithops.invokers -- ExecutorID 1a848e-0 | JobID M000 - View execution logs at /tmp/lithops/logs/1a848e-0-M000.log
2022-10-01 16:07:33,935 [INFO] lithops.wait -- ExecutorID 1a848e-0 - Getting results from 3 function activations


    0%|          | 0/3  

2022-10-01 16:07:37,087 [INFO] lithops.executors -- ExecutorID 1a848e-0 - Cleaning temporary data


[1, 4, 9]


### Pero multiprocessing tiene más abstracciones: Queue, Process, Pipes...

In [7]:
import multiprocessing as mp

def add_to_queue(queue):
    queue.put('hola')

queue = mp.Queue()
p = mp.Process(target=add_to_queue, args=(queue,))
p.start()
print(queue.get())
p.join()

hola


## Lithops las soporta!

In [8]:
import lithops.multiprocessing as mp

queue = mp.Queue()
p = mp.Process(target=add_to_queue, args=(queue,))
p.start()
print(queue.get())
p.join()

2022-10-01 16:08:23,739 [INFO] lithops.config -- Lithops v2.6.0
2022-10-01 16:08:23,752 [INFO] lithops.storage.backends.aws_s3.aws_s3 -- S3 client created - Region: eu-central-1
2022-10-01 16:08:24,284 [INFO] lithops.serverless.backends.aws_lambda.aws_lambda -- AWS Lambda client created - Region: eu-central-1
2022-10-01 16:08:24,289 [INFO] lithops.invokers -- ExecutorID 1a848e-1 | JobID A000 - Selected Runtime: gfinol/python3.10:3.0 - 1769MB
2022-10-01 16:08:24,399 [INFO] lithops.invokers -- ExecutorID 1a848e-1 | JobID A000 - Starting function invocation: cloud_process_wrapper() - Total: 1 activations
2022-10-01 16:08:24,400 [INFO] lithops.invokers -- ExecutorID 1a848e-1 | JobID A000 - View execution logs at /tmp/lithops/logs/1a848e-1-A000.log
2022-10-01 16:08:24,804 [INFO] lithops.wait -- ExecutorID 1a848e-1 - Waiting for 100% of 1 function activations to complete


hola


    0%|          | 0/1  

2022-10-01 16:08:26,824 [INFO] lithops.executors -- ExecutorID 1a848e-1 - Cleaning temporary data


## Veamos un ejemplo con listas y diccionarios compartidos

In [9]:
from multiprocessing import Process, Manager

def f(d, l):
    d['1'] = 1
    d['2'] = 2
    d['0.25'] = None
    l.reverse()

with Manager() as manager:
    d = manager.dict()
    l = manager.list(range(10)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print(d.items())
    print(l[:])

[('1', 1), ('2', 2), ('0.25', None)]
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


In [10]:
from lithops.multiprocessing import Process, Manager

with Manager() as manager:
    d = manager.dict()
    l = manager.list(range(10)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()
    
    print(d.items())
    print(l[:])

2022-10-01 16:09:10,434 [INFO] lithops.config -- Lithops v2.6.0
2022-10-01 16:09:10,446 [INFO] lithops.storage.backends.aws_s3.aws_s3 -- S3 client created - Region: eu-central-1
2022-10-01 16:09:10,922 [INFO] lithops.serverless.backends.aws_lambda.aws_lambda -- AWS Lambda client created - Region: eu-central-1
2022-10-01 16:09:10,928 [INFO] lithops.invokers -- ExecutorID 1a848e-2 | JobID A000 - Selected Runtime: gfinol/python3.10:3.0 - 1769MB
2022-10-01 16:09:11,010 [INFO] lithops.invokers -- ExecutorID 1a848e-2 | JobID A000 - Starting function invocation: cloud_process_wrapper() - Total: 1 activations
2022-10-01 16:09:11,012 [INFO] lithops.invokers -- ExecutorID 1a848e-2 | JobID A000 - View execution logs at /tmp/lithops/logs/1a848e-2-A000.log
2022-10-01 16:09:11,013 [INFO] lithops.wait -- ExecutorID 1a848e-2 - Waiting for 100% of 1 function activations to complete


    0%|          | 0/1  

2022-10-01 16:09:14,034 [INFO] lithops.executors -- ExecutorID 1a848e-2 - Cleaning temporary data


[('1', 1), ('2', 2), ('0.25', None)]
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


## Veamos un ejemplo con Values y su consistencia 

In [11]:
from multiprocessing import Pool, Value

def sync_incr(proc_id):
    for _ in range(10):
        with v.get_lock():
            v.value += 1


# Synchronized shared value
v = Value('i')
print(f"El valor inicial de v es {v.value}")

with Pool() as p:
    res = p.map_async(sync_incr, [str(i) for i in range(4)])
    p.close()
    res.get()
    p.join()

print(f"El valor final de v es {v.value}")

El valor inicial de v es 0
El valor final de v es 40


In [12]:
from lithops.multiprocessing import Pool, Value

# Synchronized shared value
v = Value('i')
print(f"El valor inicial de v es {v.value}")

with Pool() as p:
    res = p.map_async(sync_incr, [str(i) for i in range(4)])
    p.close()
    res.get()
    p.join()

print(f"El valor final de v es {v.value}")

2022-10-01 16:09:58,790 [INFO] lithops.config -- Lithops v2.6.0
2022-10-01 16:09:58,802 [INFO] lithops.storage.backends.aws_s3.aws_s3 -- S3 client created - Region: eu-central-1


El valor inicial de v es 0


2022-10-01 16:09:59,277 [INFO] lithops.serverless.backends.aws_lambda.aws_lambda -- AWS Lambda client created - Region: eu-central-1
2022-10-01 16:09:59,279 [INFO] lithops.invokers -- ExecutorID 1a848e-3 | JobID M000 - Selected Runtime: gfinol/python3.10:3.0 - 1769MB
2022-10-01 16:09:59,370 [INFO] lithops.invokers -- ExecutorID 1a848e-3 | JobID M000 - Starting function invocation: cloud_process_wrapper() - Total: 4 activations
2022-10-01 16:09:59,375 [INFO] lithops.invokers -- ExecutorID 1a848e-3 | JobID M000 - View execution logs at /tmp/lithops/logs/1a848e-3-M000.log
2022-10-01 16:09:59,377 [INFO] lithops.wait -- ExecutorID 1a848e-3 - Getting results from 4 function activations


    0%|          | 0/4  

2022-10-01 16:10:03,404 [INFO] lithops.executors -- ExecutorID 1a848e-3 - Cleaning temporary data


El valor final de v es 40
