# 확장과 구조

## 멀티프로세싱과 멀티스레딩

### 멀티스레드는 GIL로 한계

In [2]:
%%writefile ch11/multithread.py
import random
import threading

if __name__ == '__main__':
    results = []
    def compute():
        results.append(sum([random.randint(1,100) for i in range(1000000)]))
    workers = [threading.Thread(target=compute) for x in range(8)]
    
    for worker in workers: 
        worker.start()
    for worker in workers: 
        worker.join()
    print("Results: %s" % results)

Writing ch11/multithread.py


In [None]:
$>/usr/bin/time python ch11/multithread.py 
Results: [50493197, 50521691, 50479554, 50504657, 50524166, 50483934, 50520410, 50473516]
    
5.94 user 0.02 system 0:05.91 elapsed 100%CPU    # 100% CPU

### 멀티프로세스로 병렬 효과 극대화

In [4]:
%%writefile ch11/multiprocess.py
import random
import multiprocessing

if __name__ == '__main__':

    def compute(n):
        return sum([random.randint(1,100) for i in range(1000000)])
    pool = multiprocessing.Pool(processes=8)
    results = pool.map(compute, range(8))
    
    print("Results: %s" % results)

Overwriting ch11/multiprocess.py


In [None]:
$> /usr/bin/time python ch11/multiprocess.py 
Results: [50484952, 50476807, 50554903, 50493749, 50491296, 50478998, 50548086, 50498540]
6.64 user 0.01 system 0:01.24 elapsed 536%CPU  # 약 6개의  CPU 사용

## asyncio를 이용한 이벤트 반응형 아키텍쳐

In [1]:
import aiohttp
import asyncio

async def get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response

loop = asyncio.get_event_loop()
coroutines = [get('http://example.com') for _ in range(8)]
results = loop.run_until_complete(asyncio.gather(*coroutines))

print("Results: %s" % results)

RuntimeError: This event loop is already running

## 서비스 지향 아키텍쳐 

In [2]:
import multiprocessing
import random
import zmq

def compute():
    return sum([random.randint(1,100) for i in range(1000000)])
               
def worker():
    context = zmq.Context()
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://0.0.0.0:15555")
    result_sender = context.socket(zmq.PUSH)
    result_sender.connect("tcp://0.0.0.0:15556")
    poller = zmq.Poller()
    poller.register(work_receiver, zmq.POLLIN)
    
    while True:
        socks = dict(poller.poll())
        if socks.get(work_receiver) == zmq.POLLIN:
            obj = work_receiver.recv_pyobj()
            result_sender.send_pyobj(obj())

context = zmq.Context()

# 작업이 보내질 채널
work_sender = context.socket(zmq.PUSH)
work_sender.bind("tcp://0.0.0.0:15555")

# 작업 결과를 받는 채널
result_receiver = context.socket(zmq.PULL)
result_receiver.bind("tcp://0.0.0.0:15556")

# 작업자 8개 시작
processes = []
for i in range(8):
    p = multiprocessing.Process(target=worker)
    p.start()
    processes.append(p)
    
# 8개 작업 전송
for x in range(8):
    work_sender.send_pyobj(compute)
    
# 8개 결과 읽음
results = []
for x in range(8):
    results.append(result_receiver.recv_pyobj())
    
# 프로세스 종료
for p in processes:
    p.terminate()
print('Results: %s' % results)


Results: [50459386, 50510083, 50555825, 50509112, 50498972, 50464487, 50524045, 50550262]
