# Multiprocessing
***

Permite realizar processos em paralelo criando efetivamente um processo diferente de threads

**Vantagens**:

* Funciona em várias plataformas distintas diferente do **os.fork()**


* Perde em velocidade para threads porém efetivamente executa os programas em núcleos distintos da CPU

**Desvantagens**:

* Mudanças em um processo não afetam o outro


* Algumas estruturas (como o lambda) não podem ser rodadas em paralelo

Ela apresenta estruturas semelhantes aos modulos **threading** e **queue**

No Unix, usa um fork para criar um processo filho e invoca o método run de Process para roda-lo.

No windows um novo interpretador é criado usando o processo de criação de ferramentas do Windows passando um objeto codificado pelo **pickle** para um novo processo por meio de um pipe e começando  o **python -c** para rodar o programa que roda com uma função especial construida em python que lê e decodifica o objeto e invoca o método run().

Modulo popular para execução paralela: **Scipy**

***
### Exemplo Threads
***

In [1]:
# Importar os modulos
import os
from multiprocessing import Process, Lock

In [2]:
def how_am_i(label, lock):
    with lock:
        print("{0}: nome={1}, pid={2}".format(label, __name__, os.getpid()))

In [3]:
# Process cria os direntes processos assim como o Thread
def main():
    lock = Lock()
    how_am_i("Chamada da função", lock)
    process = Process(target=how_am_i, args=('filho criado', lock))
    process.start()
    process.join()
    for i in range(5):
        Process(target=how_am_i, args=(('processo rodando {0}'.format(i)), lock)).start()
    with lock:
        print('Saida da main thread')

In [4]:
main()

Chamada da função: nome=__main__, pid=3335
filho criado: nome=__main__, pid=3357
processo rodando 1: nome=__main__, pid=3361
processo rodando 0: nome=__main__, pid=3360
processo rodando 2: nome=__main__, pid=3363
processo rodando 3: nome=__main__, pid=3366
Saida da main thread
processo rodando 4: nome=__main__, pid=3371


***
### Exemplo Comunicação Interprocessual
***

Além da comunicação interprocessual por meio de pipes nomeados, o modulo multiprocessing possui algumas ferramentas que podem ser usadas para realizar a comunicação interprocessual

In [5]:
import os
from multiprocessing import Process, Pipe

In [6]:
# Manda o objeto para pai por meio de um pipe anônimo
def sender(pipe):
    # Manda a mensagem
    pipe.send(['spam', 42, 'ovos'])
    # Fecha o pipe
    pipe.close()

In [7]:
# Manda e recebe objetos por um pipe
def communicator(pipe):
    # Manda um dicionario
    pipe.send(dict(name='João', spam=42))
    # Espera uma resposta
    answer = pipe.recv()
    # Imprime a resposta
    print('Comunicador recebeu:', answer)

In [8]:
def main():
    # Pontos pipein e pipeout
    (father_result, son_result) = Pipe()
    # Primeiro processo mandando do filho para o pai atraves da função sender
    Process(target=sender, args=(son_result, )).start()
    # Imprimir o que o pai recebeu no processo acima
    print("Pai recebeu:", father_result.recv())
    # Fecha a comunicação do pipe do lado do pipe
    father_result.close()
    
    # Pontos pipein e pipeout
    (father_result, son_result) = Pipe()
    # Processo mandando do filho para o pai novamente através da função comunicador
    son = Process(target=communicator, args=(son_result, ))
    son.start()
    # Imprimir o que o pai recebeu no processo acima
    print("Pai recebeu:", father_result.recv())
    # Saida pai manda algo para o filho
    father_result.send({x * 2 for x in 'spam'})
    # Espero o filho terminar a execução
    son.join()
    
    print('Saiu de pai')

In [9]:
main()

Pai recebeu: ['spam', 42, 'ovos']
Comunicador recebeu: {'ss', 'mm', 'pp', 'aa'}
Pai recebeu: {'spam': 42, 'name': 'João'}
Saiu de pai


***
### Exemplo Queues
***

In [10]:
from multiprocessing import Process, Queue, JoinableQueue
import time

In [11]:
class Consumer(Process):
    
    def __init__(self, queue_task, queue_result):
        Process.__init__(self)
        self.queue_task = queue_task
        self.queue_result = queue_result
        
    def run(self):
        process_name = self.name
        while True:
            next_task = self.queue_task.get()
            if next_task is None:
                print("{0}: Saindo".format(process_name))
                self.queue_task.task_done()
                break
            print('{0}: {1}'.format(process_name, next_task))
            answer = next_task()
            self.queue_task.task_done()
            self.queue_result.put(answer)
        return

In [12]:
class Task(object):
    
    def __init__(self, n1, n2):
        self.n1 = n1
        self.n2 = n2
        
    def __call__(self):
        time.sleep(0.1)
        return '{0} * {1} = {2}'.format(self.n1, self.n2, self.n1 * self.n2)
    
    def __str__(self):
        return '{0} * {1}'.format(self.n1, self.n2)

In [13]:
def main():
    # Possui algumas funcionalidades a mais que a Queue normal
    tasks = JoinableQueue()
    results = Queue()

    number_of_consumers = 5
    print('Criando {0} consumidores'.format(number_of_consumers))
    
    consumers = []
    for i in range(number_of_consumers):
        consumers.append(Consumer(tasks, results))
        
    for consumer in consumers:
        consumer.start()
        
    number_of_works = 5
    for i in range(number_of_works):
        tasks.put(Task(i, i))
    
    # Indicar o fim da tarefa
    for i in range(number_of_consumers):
        tasks.put(None)
    
    # Espera todas as tasks terminar
    tasks.join()
    
    # Imprime resultados
    while number_of_works:
        result = results.get()
        print("Resultado:", result)
        number_of_works -= 1

In [14]:
main()

Criando 5 consumidores
Consumer-12: 3 * 3
Consumer-9: 0 * 0
Consumer-10: 2 * 2
Consumer-11: 4 * 4
Consumer-13: 1 * 1
Consumer-11: Saindo
Consumer-10: Saindo
Consumer-9: Saindo
Consumer-13: Saindo
Consumer-12: Saindo
Resultado: 0 * 0 = 0
Resultado: 4 * 4 = 16
Resultado: 2 * 2 = 4
Resultado: 1 * 1 = 1
Resultado: 3 * 3 = 9


***
### Exemplo event
***

Multiprocessing também possui uma estrutura chamada event que permite que haja comunicação dos estados entre processos. O evento pode ter os estados **set** e **unset**

In [15]:
from multiprocessing import Event, Process
import time

In [16]:
# Espera a ocorrencia do evento antes de fazer qualquer coisa
def wait_for_event(event):
    print("A espera pelo evento começou!")
    event.wait()
    print("Espera pelo evento event.is_set():", event.is_set())

In [17]:
# Espera time segundos para o evento, e depois timeout
def wait_for_event_timeout(event, time):
    print("A espera pelo timeout do evento iniciada!")
    event.wait(time)
    print("Espera pelo timeout do evento event.is_set():", event.is_set())

In [18]:
def main():
    event = Event()
    process1 = Process(name='block', target=wait_for_event, args=(event, ))
    process1.start()
    process2 = Process(name='not-block', target=wait_for_event_timeout, args=(event, 2))
    process2.start()
    
    print("Main: Esperando antes de chamar o Event.set()")
    time.sleep(3)
    event.set()
    print("Main: Evento está pronto!")

In [19]:
main()

A espera pelo timeout do evento iniciada!
A espera pelo evento começou!
Main: Esperando antes de chamar o Event.set()
Espera pelo timeout do evento event.is_set(): False
Main: Evento está pronto!
Espera pelo evento event.is_set(): True


***
### Exemplo pool
***

Por fim nós temos as pools que permitem que um número fixo de **workers** possam ser usados em casos simples. Isso significa que dada um número de operações a serem realizadas e um número de recursos onde elas podem ser realizadas as pools vão alocar as operações nos recursos dada a limitação no número de recursos até que todas as operações sejam terminadas. É uma forma de otimizar a utilização de recursos de seu computador

In [20]:
import multiprocessing

In [21]:
def calculate(data):
    return data * 2

In [22]:
def start_process():
    # Nome do processo rodando no momento
    print("Iniciando", multiprocessing.current_process().name)

In [23]:
def main():
    inputs = list(range(10))
    print("Input:", inputs)
    
    builtin_outputs = map(calculate, inputs)
    print("Built-in:", list(builtin_outputs))
    
    # Pega o número de CPUs do seu pc
    pool_size = multiprocessing.cpu_count() * 2
    # Cria um pool com tamanho e um inicializador
    pool = multiprocessing.Pool(processes=pool_size, initializer=start_process)
    
    pool_outputs = pool.map(calculate, inputs)
    pool.close()
    # Espera o termino dos processos
    pool.join()
    
    print("Pool:", pool_outputs)

In [24]:
main()

Input: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Iniciando ForkPoolWorker-16
Iniciando ForkPoolWorker-17
Iniciando ForkPoolWorker-18
Iniciando ForkPoolWorker-19
Iniciando ForkPoolWorker-21
Iniciando ForkPoolWorker-20
Iniciando ForkPoolWorker-22
Iniciando ForkPoolWorker-23
Pool: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
