In [1]:
from __future__ import print_function
import builtins as __builtin__
def print(*args, **kwargs):
    return __builtin__.print(*args, **{**kwargs, 'flush':True })

# Paralelismo e concorrência em Python
### O que ele faz? Ele faz coisas? Vamos descobrir!

## Part 0 - Um problema de interpretação

### Global Interpreter Lock

[Fonte diss'aqui tudo sobre o GIL](https://realpython.com/python-gil/)

1. O GIL é um Lock que controla o interpretador do Python.
2. Ele só permite 1 thread usar o interpretador por vez.
3. Ele foi criado pra resolver problemas de contagem de referência de variáveis.

In [2]:
import sys


foo = [] 
print(sys.getrefcount(foo))
foo.append(5)
print(sys.getrefcount(foo))
del foo

2
2


In [3]:
alice = 8
print(sys.getrefcount(alice))
del alice
alice = 4846848648468
print(sys.getrefcount(alice))
del alice
alice = {}
print(sys.getrefcount(alice))
del alice

325
3
2


In [4]:
burro = 'já chegou chureque'
print(sys.getrefcount(burro))
print(sys.getrefcount(burro))
print(sys.getrefcount(burro))
print(sys.getrefcount(burro))
pass

3
3
3
3


In [5]:
aurelio = dict()
print(sys.getrefcount(aurelio))
aurelio['ornitorrinco'] = 'é um vegetal'
print(sys.getrefcount(aurelio))
print(sys.getrefcount(aurelio['ornitorrinco']))

2
2
3


In [6]:
bar = []
print(sys.getrefcount(bar))
bar = {}
print(sys.getrefcount(bar))
del bar

2
2


In [7]:
uno = []
dos = uno
tres = dos
cuatro = tres
print(sys.getrefcount(cuatro))
print(sys.getrefcount(uno))

5
5


4. Isso não significa que as thread não sejam reais. Só que elas ficam com a execução pausada caso elas precisem do interpretador.

### Quando usar multithreading no python então?
#### *Quando as tarefas são I/O bounded ou não precisam reinterpretar código.*

Da documentação:  
"*However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.*"

## Act 1: Concurrency
Requirements:
- python >= 3.3

1. A biblioteca de concorrência de python implementa um event loop single threaded com async/await

In [8]:
import asyncio

async def a():
    print("ba")

@asyncio.coroutine #deprecated 
def b():
    print("ca")

async def start():
    print('a')
    await asyncio.gather(a(),b()) #basicamente "Promise.all". por algum motivo não é uma lista aqui...
    print('xi')
    return '!'
    
print(*await asyncio.gather(start()))

a
ba
ca
xi
!


  def b():


2. No python, existe um tipo chamado Awaitable que inclui corrotinas e Futures

In [9]:
from typing import Awaitable

loop = asyncio.get_event_loop()
x = loop.create_future()
async def test():
    return

print('Future ----------------------------')
print(type(x), str(x))
print(isinstance(x, Awaitable))
print(x.__await__)

print('Function ----------------------------')
print(type(test), str(test))
print(isinstance(test, Awaitable))

print('Coroutine ----------------------------')
print(type(test()), str(test()))
print(isinstance(test(), Awaitable))
print(test().__await__)

Future ----------------------------
<class '_asyncio.Future'> <Future pending>
True
<method-wrapper '__await__' of _asyncio.Future object at 0x7f06b095f840>
Function ----------------------------
<class 'function'> <function test at 0x7f06b00d7af0>
False
Coroutine ----------------------------
<class 'coroutine'> <coroutine object test at 0x7f06b00ea140>
True
<method-wrapper '__await__' of coroutine object at 0x7f06b00ea2c0>


  print(type(test()), str(test()))
  print(isinstance(test(), Awaitable))
  print(test().__await__)


## Act 2: Threading

Requirements:  
- python >= 2.7  
( para python < 3.7, verifique a documentação pq tem umas treta )

In [10]:
import threading

make_cpu_hot = False
size = 300*1000*1000 if make_cpu_hot else 999

def f():
    out = str(threading.currentThread().getName())
    out += ' %d' % (sum(range(0,size)) % 23)
    print(out)

def main():
    t1 = threading.Thread(target=f, name='Bar')          
    t2 = threading.Thread(target=f, name='ba')
    t3 = threading.Thread(target=f, name='dos')
    t1.start()
    t2.start()
    t3.start()
 
    t1.join()
    t2.join()
    t3.join()
if __name__ == '__main__':
    main()

Bar 22
ba 22
dos 22


with make_cpu_how == True
![ONE CPU BURNING](single-cpu.png)

### Módulos externos
1. Algums módulos como o Numpy implementam suas estruturas de dados e métodos pré compilados em C. Como o interpretador só precisa invocar o módulo e consegue ficar livre para executar outra thread, as mesmas conseguem rodar em paralelo com mais facilidade!

In [11]:
import threading
import numpy as np

make_cpu_hot = False
size = 300*1000*1000 if make_cpu_hot else 999

def f():
    out = str(threading.currentThread().getName())
    out += ' %d' % np.mod(np.sum(np.arange(0,size)),23)
    print(out)

def main():
    t1 = threading.Thread(target=f, name='Bar')          
    t2 = threading.Thread(target=f, name='ba')
    t3 = threading.Thread(target=f, name='dos')
    t1.start()
    t2.start()
    t3.start()
 
    t1.join()
    t2.join()
    t3.join()
if __name__ == '__main__':
    main()

Bar 22
ba 22
dos 22


**WARNING: A LOT OF RAM**  
with make_cpu_how == True
![ONE CPU BURNING](multi-cpu.png)

### Dicas práticas


In [12]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np

def f(X):
    return np.power(X, 2)

if __name__ == '__main__':
    X = [np.random.randint(20000) for i in range(10)]
    
    with ThreadPoolExecutor(max_workers=3) as executor:
        future_set = {executor.submit(f, x): x
                      for x in X}
        
    for future in as_completed(future_set):
        print(future_set[future], 'squared is', future.result())

16024 squared is 256768576
7147 squared is 51079609
1361 squared is 1852321
9444 squared is 89189136
9449 squared is 89283601
17280 squared is 298598400
9561 squared is 91412721
5025 squared is 25250625
15835 squared is 250747225
6094 squared is 37136836


## Act 3: Multiprocessing

1. O módulo de multiprocessing resolve o problema do GLI criando um interpretador pra cada processo. Isso gasta mais memória e mais CPU e tem um overhead de inicialização maior. Mas é o método mais recomendado pra tarefas gerais se você não precisa extrair cada gota de performance e só quer paralelizar rápido.

In [13]:
from multiprocessing import Process, current_process

make_cpu_hot = False
size = 10*1000*1000*1000 if make_cpu_hot else 999

def f():
    out = str(current_process().name)
    out += ' %d' % (sum(range(0,size)) % 23)
    print(out)

def main():
    t1 = Process(target=f, name='Bar')          
    t2 = Process(target=f, name='ba')
    t3 = Process(target=f, name='dos')
    t1.start()
    t2.start()
    t3.start()
 
    t1.join()
    t2.join()
    t3.join()
if __name__ == '__main__':
    main()

Bar 22
ba 22
dos 22


with make_cpu_how == True
![ONE CPU BURNING](multi-process.png)

## Dicas práticas
1. Pool simplificado em 3 passos fáceis

In [14]:
# 1 importa
from multiprocessing import Pool, cpu_count

def f(X):
    return np.power(X, 2)
X = [np.random.randint(20000) for i in range(10)]

if __name__ == '__main__':    
    # 2 instancia
    with Pool(processes=cpu_count()) as p:
        # 3 map
        Y = p.map(f, X)

for x,y in list(zip(X, Y)):
    print(x, 'squared is', y)

6250 squared is 39062500
6198 squared is 38415204
4036 squared is 16289296
5930 squared is 35164900
19135 squared is 366148225
7031 squared is 49434961
19447 squared is 378185809
13000 squared is 169000000
2361 squared is 5574321
10680 squared is 114062400


2. concurrent.futures também tem uma interface pra multiprocessing! E os métodos são os mesmos que pra threads!

In [15]:
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np

def f(X):
    return np.power(X, 2)

if __name__ == '__main__':
    X = [np.random.randint(20000) for i in range(10)]
    
    with ProcessPoolExecutor(max_workers=3) as executor:
        future_set = {executor.submit(f, x): x
                      for x in X}
        
    for future in as_completed(future_set):
        print(future_set[future], 'squared is', future.result())

18289 squared is 334487521
16427 squared is 269846329
19469 squared is 379041961
8560 squared is 73273600
10179 squared is 103612041
10484 squared is 109914256
16816 squared is 282777856
8328 squared is 69355584
1405 squared is 1974025
18631 squared is 347114161


## Conclusão

O foco de python é ser amigável pra escrever e entender, dá pra resolver de todo tipo de problema, mas nem sempre é a melhor ferramenta. Mas na maior parte das vezes é bom o suficiente se você observar esses detalhes.

Nesse notebook eu poderia cobrir Dask, Spark e outras ferramentas que também tem implementações de paralelismo, mas não rolou tempo! xP