# 13. Multiprocessing – When a Single CPU Core Is Not Enough

* Local multiprocessing
* Remote multiprocessing
* Data sharing and synchronization between processes

## Multithreading versus multiprocessing

### Multithreading

* still executed in single process
* in a single CPU core
* also have to deal with limitation of GIL
* does not help performance
* python3 perf >= python2 perf
* for CPU-bound task, multithread does not help
* for Disk IO bound, it help, but recommend asyncio

In [1]:
import datetime
import threading

def busy_wait(n):
    while n > 0:
        n -= 1

if __name__ == '__main__':
    n = 10000000
    start = datetime.datetime.now()
    for _ in range(4):
        busy_wait(n)
    end = datetime.datetime.now()
    print('The single threaded loops took: %s' % (end - start))
    start = datetime.datetime.now()
    threads = []
    for _ in range(4):
        thread = threading.Thread(target=busy_wait, args=(n,))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    end = datetime.datetime.now()
    print('The multithreaded loops took: %s' % (end - start))

The single threaded loops took: 0:00:03.831617
The multithreaded loops took: 0:00:04.150431


### Multiprocessing

* executed in multiple processes
* even in a remote machines' process
* no limitation of GIL

In [3]:
import datetime
import multiprocessing
def busy_wait(n):
    while n > 0:
        n -= 1

if __name__ == '__main__':
    n = 10000000
    start = datetime.datetime.now()
    processes = []
    for _ in range(4):
        process = multiprocessing.Process(
        target=busy_wait, args=(n,))
        process.start()
        processes.append(process)
    for process in processes:
        process.join()
    end = datetime.datetime.now()
    print('The multiprocessed loops took: %s' % (end - start))

The multiprocessed loops took: 0:00:02.546087


In [6]:
print(multiprocessing.cpu_count()) # does not count hyperthreading

4


# Hyper-threading versus physical CPU cores

물리 CPU 개수보다 늘려도 별 효과 없음
* context 스위칭 비용 때문에 더 느려질 수도

Recommendation
* Disk I/O bound? => single process
* CPU bound?  => The amount of physical CPU cores 
* Network I/O bound? Start with the defaults and tune if needed.
* No obvious bound but many parallel processes are needed? => try asyncio first

In [10]:
import sys
import datetime
import multiprocessing

if __name__ == '__main__':
    n = 10000000
    processes = 1
    start = datetime.datetime.now()
    with multiprocessing.Pool(processes=processes) as pool:
        # Execute the busy_wait function 8 times with parameter n
        pool.map(busy_wait, [n for _ in range(8)])
    end = datetime.datetime.now()
    print('The multithreaded loops took: %s' % (end - start))

The multithreaded loops took: 0:00:07.995279


The multithreaded loops took: 0:00:05.297707 #1

The multithreaded loops took: 0:00:02.701344 #2

The multithreaded loops took: 0:00:01.477845 #4

The multithreaded loops took: 0:00:01.579218 #8

The multithreaded loops took: 0:00:01.595239 #16

# Creating a pool of workers

있는 거 쓰라

In [13]:
import time
import multiprocessing

if __name__ == '__main__':
    n = 10000000
    items = [n for _ in range(8)]
    with multiprocessing.Pool() as pool:
        results = []
        start = time.time()
        print('Start processing...')
        for _ in range(5):
            results.append(pool.map_async(busy_wait, items))
            print('Still processing %.3f' % (time.time() - start))
        for result in results:
            result.wait()
            print('Result done %.3f' % (time.time() - start))
        print('Done processing: %.3f' % (time.time() - start))

Start processing...
Still processing 0.001
Still processing 0.001
Still processing 0.001
Still processing 0.001
Still processing 0.001
Result done 5.700
Result done 11.004
Result done 16.292
Result done 22.097
Result done 28.178
Done processing: 28.178


# Sharing data between processes

Don't share
* 프로세스끼리 공유하지 말고, 각각 자기것만 처리(local)

그럼에도 공유해야 한다면
* Pipe, Namespace, Queue

sync에 시간 소요
* 수 miliseoconds to 수백 milliseconds

Namespace
* 일종의 regular object
* namespace = manager.Namespace()
** namespace.spam = 123
** namespace.eggs = 456

Pipe
* Bidirectional communication
* offer a reader and a writer
* combine multiple processes/endpoints

뭘 해도 lock으로 인해 시간 소요 
* regular database server => 10 transaction limit
* SSD => 100 transactions
* 좋은 장비써도 그래도 시간 소요


# Remote processes

기능 있으나.. 문서화도 별로.. 

In [None]:
# Driver Server 프로그램 시작

import constants
import multiprocessing
from multiprocessing import managers

queue = multiprocessing.Queue()
manager = managers.BaseManager(address=('', constants.port),
                                authkey=constants.password)
manager.register('queue', callable=lambda: queue)
manager.register('primes', callable=constants.primes)
server = manager.get_server()
server.serve_forever()

In [None]:
# 할일들 등록

from multiprocessing import managers
import functions

manager = managers.BaseManager(address=(functions.host, functions.port),
                               authkey=functions.password)
manager.register('queue')
manager.connect()
queue = manager.queue()
for i in range(1000):
    queue.put(i)

In [None]:
# Worker in each worker machine

from multiprocessing import managers
import functions

manager = managers.BaseManager(address=(functions.host, functions.port),
                               authkey=functions.password)
manager.register('queue')
manager.register('primes')
manager.connect()
queue = manager.queue()

while not queue.empty():
    # Worker in each worer machine
    print(manager.primes(queue.get()))

# Distributed processing using IPyparallel

Support
* Single program, multiple data (SPMD) parallelism
* Multiple program, multiple data (MPMD) parallelism
* Message passing using MPI
* Task farming
* Data parallel

IPython enables all types of parallel applications to be developed, executed, debugged, and monitored interactively


Consists of four components:
* The IPython engine
  * listens for requests over the network, runs code, and returns results
* The IPython hub
  * keeps track of engine connections, schedulers, clients, as well as all task requests and results
* The IPython schedulers
* The IPython client



$ ipcluster start -n 4

In [1]: import ipyparallel as ipp

In [2]: c = ipp.Client()

In [3]: c.ids
Out[3]: [0, 1, 2, 3]

In [4]: c[:].apply_sync(lambda : "Hello, World")
Out[4]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]`