# Threads, MultiProcess e AsyncIO

## Threads
As *threads* permitem a execução de código de forma concorrente (compartilhando o tempo do processador). Segue um exemplo simples. Primeiro, a versão sequencial.

In [1]:
# Versão sequencial
import time
def tarefa(ini,fim):
    for i in range(ini,fim):
        print(f'De {ini} a {fim}. Valor atual: {i}')
        time.sleep(1)
        

tarefa(1,3)
# a primeira chamada deve terminar antes de 
# começar a execução da segunda chamada
tarefa(4,6)    

De 1 a 3. Valor atual: 1
De 1 a 3. Valor atual: 2
De 4 a 6. Valor atual: 4
De 4 a 6. Valor atual: 5


No código a seguir, cada chamada à função `tarefa` é executada em uma thread diferente. Assim, enquanto uma das tarefas está dormindo (`sleep`), a outra pode ser executada. 

In [3]:
# Versão concorrente utilizando Threads 
import threading

# Criar as threads
# Mais um exemplo de uma função de ordem superior!
T1 = threading.Thread(target=tarefa, args=(1,5))
T2 = threading.Thread(target=tarefa, args=(6,10))

# Neste ponto, a execução da função ainda não começou 
print ('Antes de iniciar')
# Iniciar a execução 
T1.start()
T2.start()

print("Depois de T1 e T2")

Antes de iniciar
De 1 a 5. Valor atual: 1
De 6 a 10. Valor atual: 6
Depois de T1 e T2
De 1 a 5. Valor atual: 2
De 6 a 10. Valor atual: 7
De 1 a 5. Valor atual: 3De 6 a 10. Valor atual: 8

De 1 a 5. Valor atual: 4De 6 a 10. Valor atual: 9



Note que a mensagem "Depois de T1 e T2" não aparece (necessariamente) depois de terminar T1 e T2. De fato: `T1.start` inicializa a thread e permite que a próxima instrução seja executada 

Para _esperar_ pela terminação de uma thread, podemos utilizar `join`:

In [4]:
import threading
# Criar as threads
T1 = threading.Thread(target=tarefa, args=(1,5))
T2 = threading.Thread(target=tarefa, args=(6,10))

# Iniciar a execução 
T1.start()
T2.start()

# Esperar por terminação
T1.join()
T2.join()
print("Depois de T1 e T2")

De 1 a 5. Valor atual: 1
De 6 a 10. Valor atual: 6
De 1 a 5. Valor atual: 2
De 6 a 10. Valor atual: 7
De 1 a 5. Valor atual: 3
De 6 a 10. Valor atual: 8
De 1 a 5. Valor atual: 4
De 6 a 10. Valor atual: 9
Depois de T1 e T2


## Recursos compartilhados 
Com grandes poderes vêm grandes responsabilidades:

In [7]:
import threading
# Variável global (compartilhada pelas 2 threads)
soma = 0

def calcularsoma(step,n ):
    ''' calcular sum +=  step + step + step ...  n vezes '''
    global soma # considerar a variável externa/global
    for i in range(n):
        soma = soma + step

# +1 1000000 vezes e -1 1000000 vezes
sum1 = threading.Thread(target=calcularsoma, args=(1,1000000,))
sum2 = threading.Thread(target=calcularsoma, args=(-1,1000000,))

# Versão seqüencial
sum1.start()
sum1.join()
sum2.start()
sum2.join()

print(f'Resultado seq: {soma}')

soma = 0 
# Versão concorrente 
sum1 = threading.Thread(target=calcularsoma, args=(1,1000000,))
sum2 = threading.Thread(target=calcularsoma, args=(-1,1000000,))

sum1.start()
sum2.start()
# Esperar terminação 
sum1.join()
sum2.join()

print(f'Resultado concorrente: {soma}')


Resultado seq: 0
Resultado concorrente: -239831


Note que o nosso programa é *não determinístico*: não podemos predizer o resultado final!

Neste caso, a variável global `soma` é um recurso compartilhado pelas duas threads. O uso desse recurso deve ser controlado: Em quanto executamos `soma = soma + step`, a outra thread não pode estar modificando o valor de `soma`. 

Podemos utilizas `Locks` para proteger o acesso concorrente aos dados: 

In [9]:
import threading

soma = 0

# Criar um Lock
L = threading.Lock()

def calcularsoma(step,n ):
    ''' calcular sum + step + step .... n times '''
    global soma
    for i in range(n):
        #Antes de utilizar uma variável compartilhada, a thread
        #deve adquirir o lock
        L.acquire() 
        soma = soma + step
        # Depois  de utilizar o recurso, a thread libera o lock 
        #(para que as outras threads possam ter acesso a ele)
        L.release() 
        
sum1 = threading.Thread(target=calcularsoma, args=(1,1000000,))
sum2 = threading.Thread(target=calcularsoma, args=(-1,1000000,))

# Versão concorrente 
sum1.start()
sum2.start()

# Esperar terminação 
sum1.join()
sum2.join()

print(f'Resultado concorrente: {soma}')


Resultado concorrente: 0


## Multiprocessing

O GIL (Global Interpreter Lock) de Python controla a execução das threads. Por simplicidade e robustez, o GIL só permite que uma thread tenha o controle da execução do interpretador. Se uma thread está esperando por operações de I/O (disco, rede, entrada de um usuário), o interpretador passa o controle a outra thread. Isto significa que as threads podem melhorar o desempenho de aplicações que utilizam frequentemente operações de entrada e saída.  Porém, quando as diferentes tarefas requerem o tempo do processador (por exemplo, cálculos complexos), adicionar threads não vai melhorar muito o desempenho. 

Uma alternativa aos threads em Python é o uso da biblioteca [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) que permite executar  em paralelo (utilizando, por exemplo, os vários núcleos do processador) várias tarefas. Uma diferência fundamental entre threads e processos é que todos os threads criados compartilham o mesmo espaço na memória (como no caso da variável `soma`).  A comunicação entre os processos de multiprocessing deve ser implementada intercambiando 	mensagens o utilizando estruturas de dados como Filas. 

In [15]:
from multiprocessing import Process

l = []

def add(n):
    ''' adicionar 0..n-1 na lista l'''
    global l # Não comparilhada pelos 2 processos!
    for i in range(n):
        l.append(i)

if __name__ == "__main__"    :
    # Criando os processos         
    p1 = Process(target=add,args=(3,)) 
    p2 = Process(target=add,args=(5,)) 

    # Iniciando as tarefas
    p1.start()
    p2.start()

    # Esperando por terminaç˜åo
    p1.join()
    p2.join()
    print(l) # Continua vazia!
        

[]


In [16]:
# O mesmo código utilizando threads
l = []

def add(n):
    ''' adicionar 0..n-1 na lista l'''
    global l # Compartilhada pelos dois processos
    for i in range(n):
        l.append(i)

        
p1 = threading.Thread(target=add,args=(3,)) 
p2 = threading.Thread(target=add,args=(5,)) 
p1.start()
p2.start()
p1.join()
p2.join()
print(l) 
        

[0, 1, 2, 0, 1, 2, 3, 4]


Atenção! Várias threads atualizando a mesma lista pode resultar em saídas não esperadas:

In [17]:
l = list(range(1,21))

def inc(v):
    time.sleep(0.01)
    return v+1

def add():
    global l
    for i,_ in enumerate(l):
        l[i] = inc(l[i])
        

n = 1000
thrs = [threading.Thread(target=add) for _ in range(n)]         
for t in thrs:
    t.start()
for t in thrs:
    t.join()
    
print(l)    
    

[15, 15, 19, 17, 17, 23, 20, 23, 24, 24, 24, 28, 28, 32, 29, 32, 37, 39, 40, 45]


## Concorrência Declarativa / Funcional 

A grande vantagem da programação funcional é a ausência de efeitos colaterais: duas chamadas à mesma função (com os mesmos parâmetros), sempre retorna o mesmo valor. Se cada processo ou thread não interfere com o outro (e não compartilham informações), situações inesperadas não deveriam acontecer. Segue um exemplo utilizando `Pools`

In [None]:
from multiprocessing import Pool 

def inc(x):
    time.sleep(1)
    return x+1

if __name__ == "__main__": 
    l = [1,2,3]
    #=============
    # Versão sequencial
    print(f'Ini')
    t1 = time.perf_counter()
    print(list(map(inc,l)))
    t2 =  time.perf_counter() - t1
    print(f'Fim: {t2}')
    
    #======================
    # versão paralela
    print(f'Ini')
    t1 = time.perf_counter()
    with Pool(3) as p: 
        # Aplicar a mesma função (inc) a cada elemento da lista
        # utilizando, quando possível, diferentes processos  
        print(list(p.map(inc,l)))

    t2 =  time.perf_counter() - t1
    print(f'Fim: {t2}')        
    


Ini
[2, 3, 4]
Fim: 3.008370566999929
Ini


O comando `with Pool(3) as p` vai criar 3 processos (independentes) e cada um deles vai aplicar a função inc a um dos elementos da lista. 

Considere um programa que armazena em disco uma lista de URLs:

In [None]:
import requests
import tempfile
import os
import time
from multiprocessing import Pool

# A lista de URLs a serem salvas
l = ["https://docs.python.org/3/",
     "https://ufrn.br/",
     "https://www.uol.com.br/",
     "http://learnyouahaskell.com/",
     "https://sigaa.ufrn.br/"]

def download(url):
    '''Armazenar a url em um arquivo temporal'''
    res = requests.get(url)
    if res.status_code == 200:
        # Criar um arquivo temporal
        arquivo, nome = tempfile.mkstemp()
        # Salvar o conteúdo da página
        with os.fdopen(arquivo, 'w') as tmp:
            tmp.write(res.text)

            
    return nome

if __name__ == "__main__":
    # Versão sequencial
    print('ini')
    t1 = time.perf_counter()
    for url in l:
        print(download(url))

    t2 =  time.perf_counter() - t1
    print(f'Fim: {t2}')            

    # Versão Concorrente
    print('ini')
    t1 = time.perf_counter()
    with Pool(4) as p: 
        for arquivo in p.map(download,l):
            print(arquivo)
    t2 =  time.perf_counter() - t1
    print(f'Fim: {t2}')                

ini
/var/folders/t3/ctllf82x6mz44rwlp8gqcjkw0000gn/T/tmpffe_862_
/var/folders/t3/ctllf82x6mz44rwlp8gqcjkw0000gn/T/tmp07le2wif
/var/folders/t3/ctllf82x6mz44rwlp8gqcjkw0000gn/T/tmpf8ssk_jx
/var/folders/t3/ctllf82x6mz44rwlp8gqcjkw0000gn/T/tmpeocsk_tx
/var/folders/t3/ctllf82x6mz44rwlp8gqcjkw0000gn/T/tmpi_i6kf1b
Fim: 3.6276262499999987
ini


## Asynchronous I/O

As operações de I/O são relativamente mais lentas e, portanto, um gargalo. A sintaxe `async / await` (e o pacote asyncio) permitem especificar "corrotinas".  A ideia é simples: enquanto uma corrotina está esperando por  um resultado, o controle é transferido para as outras corrotinas para que sejam executadas. 

O *Loop de Eventos* se ocupa de gerenciar a execução concorrente de várias tarefas assíncronas. 


In [None]:
import asyncio

async def f():
    ''''''
    print('inicio')
    # Note o uso de asyncio.sleep: a versão async de time.sleep
    await asyncio.sleep(1)
    print('fim')

if __name__ == "__main__":
    asyncio.run(f())

In [None]:
import asyncio

# Notação async: g é uma coroutine
async def g(n):
    '''Antes de retornar n+1, espera por 1 segundo'''
    print(f'inicio: {n}')
    # await: esperar pela terminação de sleep
    await asyncio.sleep(1)
    print(f'fim: {n}')
    return n+1

async def f():
    # Esperar pela execução de g(1)
    x1 =  await g(1)
    # Esperar pela execução de g(2)
    x2 =  await g(2)
    
    print(f'resultado: {x1,x2}')

if __name__ == "__main__":
    asyncio.run(f())
    # Note que f() não executa a função!
    x = f()
    print(x) # <coroutine object f at 0xxxxx>

Note que `f()` não executa a função: Simplesmente retorna um objeto `coroutine`. Isso se parece bastante às funções geradoras! 

No exemplo anterior, note que as duas tarefas são executadas de forma sequencial porque a o comando `await` espera pela terminação (e retorno) da função. 

In [None]:
import asyncio

async def g(n):
    print(f'inicio: {n}')
    await asyncio.sleep(1)
    print(f'fim: {n}')
    return n+1

async def f():
    t1 = asyncio.create_task(g(1))
    t2 = asyncio.create_task(g(2))
    # As tarefas foram criadas e começam a ser executadas

    # Provavelmente a função ainda não determinou o valor de retorno.
    # Esse valor é um "future". Para determinar esse valor, devemos esperar
    await t1 
    await t2
    
    #t1.result() é o resultado retornado pela função g(1)
    print(f'resultado: {t1.result(),t2.result()}')

if __name__ == "__main__":
    asyncio.run(f())


In [None]:
import asyncio

# Uma forma mais simples de esperar por várias tarefas asíncronas

async def g(n):
    print(f'inicio: {n}')
    await asyncio.sleep(1)
    print(f'fim: {n}')
    return n+1

async def f():
    # Uma formal alternativa utilizando gather
    # g(1) e g(2) são executadas de maneira asíncrona
    # e esperamos pelos resultados
    r = await asyncio.gather(g(1), g(2))
    print(r)
    
 
if __name__ == "__main__":
    asyncio.run(f())


In [None]:
# Um exemplo mais concreto utilizando aiohttp (para realizar solicitações HTTP assíncronas) 
# e aiofiles (para escrever de maneira assíncrona arquivos). 
import time

# Framework async
import aiohttp
import asyncio
import aiofiles

# Só para testar a versão sequencial
import shutil
import requests

async def salvar(session, url):
    '''Salvar a imagem da url em disco'''
    
    fname="" # Nome do arquivo
    try:
        async with session.get(url) as resp: # solicitação HTTP
            if resp.status == 200:
                fname = url[-5:] # Nome da imagem
                f = await aiofiles.open(fname, mode='wb') # Esperar pela abertura  do arquivo
                await f.write(await resp.read()) # esperar pela escrita do arquivo
                await f.close() # fechar o arquivo
                
    except Exception as e:
        print(e)

    print(f'Imagem:  {fname}')    
    return fname
            
async def salvarAll(l):
    '''Salvar todas as imagens da lista'''
    async with aiohttp.ClientSession() as session: # Criar a conexão 
        tarefas = (salvar(session,url) for url in l) # Adicionar as tarefas
        t1 = time.perf_counter()
        r = await asyncio.gather(*tarefas) # Executar todas as tarefas (e esperar)
        t2 = time.perf_counter() - t1
        print(f'Tempo async: {t2}')
        
def salvarSeq(l):
    '''Versão sequencial'''
    t1 = time.perf_counter()
    for url in l:
        res = requests.get(url,stream=True)
        if res.status_code == 200:
            fname = "seq-" + url[-5:]
            with open(fname, 'wb') as arquivo:
                shutil.copyfileobj(res.raw, arquivo)
            print(fname)
                
    t2 = time.perf_counter() - t1
    print(f'Tempo seq: {t2}')           
            
if __name__ == "__main__":
    l = ["http://people.csail.mit.edu/brussell/research/LabelMe/Images/boston_static_street/0002.jpg",
         "http://people.csail.mit.edu/brussell/research/LabelMe/Images/boston_static_street/0003.jpg",
         "http://people.csail.mit.edu/brussell/research/LabelMe/Images/boston_static_street/0004.jpg",
         "http://people.csail.mit.edu/brussell/research/LabelMe/Images/boston_static_street/0006.jpg",
         "http://people.csail.mit.edu/brussell/research/LabelMe/Images/boston_static_street/0007.jpg"
        ]
    salvarSeq(l) # Versão sequencial       
    asyncio.run(salvarAll(l)) # Versão async