# 6.1 멀티 프로세스와 멀티 스레드

6.1 멀티 프로세스와 멀티 스레드
- 운영 체제에서 실행되는 각 프로그램은 각각이 별도의 프로세스이다.
- 각 프로세스에는 하나 이상의 스레드가 있다.
- 멀티프로세스와 멀티스레드라는 두 가지 방법을 사용하면 프로그램의 작업 부하를 분산시킬 수 있다.

1. 멀티프로세스
    - 각 프로세스마다 별도의 메모리 영역을 가지고, 특별한 메커니즘(ex.큐)으로만 통신가능하다.
    - 파이썬에서는 subprocess 모듈을 사용한다.
2. 멀티스레드
    - 단일 프로세스 내의 멀티 스레드는 동일한 메모리에 접근한다.
    - 데이터 공유를 통해서 간단하게 통신하는데, threading 모듈의 처리를 통해서 한번에 한 스레드만 메모리 영역에 접근할 수 있다.(파이썬의 GIL 제한...)
    - 각 프로세스는 독립적인 스택, 힙, 코드, 데이터 영역을 가지지만, 한 프로세스에 속한 스레드들은 스택을 제외하고 다른 메모리 영역은 공유한다.


**동시성 :
    논리적으로 여러작업이 동시에 실행되는 것 처럼 보이는 것

**병렬성 :
    물리적으로 진짜 여러 작업이 동시에 처리되는 것

6.1.1 subprocess 모듈

In [None]:
import subprocess

subprocess.run(["echo", "이것은 subprocess입니다."])

subprocess.run(["sleep", "10"])

CompletedProcess(args=['sleep', '10'], returncode=0)

6.1.2 Threading 모듈

- 멀티 스레드를 사용하려면 threading 모듈을 사용한다.
- 내부적으로 락을 관리하려면 queue모듈을 사용한다.
- 큐에 의존하면 자원의 접근을 직렬화할 수 있고, 즉 한 번에 하나의 스레드만 데이터에 접근할 수 있게 한다는 말이다.

In [None]:
# 할일 20개를 5개의 스레드가 나누어서 처리한다는 코드

import queue
import threading

q = queue.Queue()

def worker(num):
    while True:
        item = q.get()              #q라는 큐에서 하나씩 가져와서 item이라는 변수에 지정하겠다.
        if item is None :           #못 가져오면 break 하겟다
            break

        #작업을 처리한다.
        print("스레드 {0} : 처리 완료 {1}".format(num+1, item))
        #왜 스레드의 인덱스가 순서대로 안나오는가! -> 
        q.task_done()

if __name__ == "__main__":
    num_worker_threads = 5
    threads = []
    for i in range(num_worker_threads):
        t = threading.Thread(target=worker, args = (i,))
        t.start()
        threads.append(t)
    
    for item in range(20):
        q.put(item)

    #모든 작업이 끝날 때까지 대기한다(block).
    q.join()

    #워커 스레드를 종료한다(stop).
    for i in range(num_worker_threads):
        q.put(None)
    for t in threads:
        t.join()


스레드 1 : 처리 완료 0
스레드 1 : 처리 완료 1
스레드 1 : 처리 완료 2
스레드 1 : 처리 완료 3
스레드 1 : 처리 완료 4
스레드 1 : 처리 완료 5
스레드 1 : 처리 완료 6
스레드 1 : 처리 완료 7
스레드 1 : 처리 완료 8
스레드 1 : 처리 완료 9스레드 4 : 처리 완료 10
스레드 3 : 처리 완료 11
스레드 4 : 처리 완료 12
스레드 4 : 처리 완료 13
스레드 3 : 처리 완료 14
스레드 3 : 처리 완료 15
스레드 3 : 처리 완료 16
스레드 3 : 처리 완료 17
스레드 3 : 처리 완료 18
스레드 3 : 처리 완료 19



6.1.3 뮤텍스와 세마포어
- 뮤텍스는 락과 같고, 공유 리소스에 한 번에 하나의 스레드만 접근할 수 있도록 하는 상호 배제 동시성 제어 정책을 강제하기위해 설계되었다.

- 뮤텍스가 양수일 때부터 스레드는 일을 시작 할 수 있음
- 뮤텍스가 1부터 시작 -> 어떤 스레드가 일을 시작할 때 뮤텍스 값을 1 감소시킨다(락한다는 의미) -> 일을 끝내면 뮤텍스 값을 1 증가시킨다(언락한다는 의미)


In [None]:
from threading import Thread, Lock
import threading

def worker(mutex, data, thread_safe):
    if thread_safe:
        mutex.acquire()
    try:
        print("스레드 {0} : {1}\n".format(threading.get_ident(), data))
    finally :
        if thread_safe:
            mutex.release()

if __name__ == "__main__":
    threads = []
    thread_safe = False
    mutex = Lock()

    for i in range(20):
        t = Thread(target = worker, args=(mutex, i, thread_safe))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

스레드 140393333991168 : 0

스레드 140393333991168 : 1

스레드 140393333991168 : 2
스레드 140393350776576 : 3


스레드 140393350776576 : 4

스레드 140393333991168 : 5

스레드 140393333991168 : 6

스레드 140393333991168 : 7

스레드 140393333991168 : 8

스레드 140393333991168 : 9

스레드 140393333991168 : 10
스레드 140393350776576 : 11

스레드 140393350776576 : 12


스레드 140393333991168 : 13
스레드 140393350776576 : 14


스레드 140393333991168 : 15

스레드 140393333991168 : 16
스레드 140393350776576 : 17


스레드 140393333991168 : 18

스레드 140393333991168 : 19



In [None]:
#thread_safe 변수를 True로 지정.. -> mutex.acquire가 실행됨

from threading import Thread, Lock
import threading

def worker(mutex, data, thread_safe):
    if thread_safe:
        mutex.acquire()
    try:
        print("스레드 {0} : {1}\n".format(threading.get_ident(), data))
    finally :
        if thread_safe:
            mutex.release()

if __name__ == "__main__":
    threads = []
    thread_safe = True
    mutex = Lock()

    for i in range(20):
        t = Thread(target = worker, args=(mutex, i, thread_safe))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

스레드 140393333991168 : 0

스레드 140393350776576 : 1

스레드 140393333991168 : 2

스레드 140393359169280 : 3

스레드 140393350776576 : 4

스레드 140393367561984 : 5

스레드 140393333991168 : 6

스레드 140393325598464 : 7

스레드 140393350776576 : 8

스레드 140393325598464 : 9

스레드 140393325598464 : 10

스레드 140393350776576 : 11

스레드 140393325598464 : 12

스레드 140393333991168 : 13

스레드 140393367561984 : 14

스레드 140393350776576 : 15

스레드 140393333991168 : 16

스레드 140393333991168 : 17

스레드 140393350776576 : 18

스레드 140393333991168 : 19



- 세마포어는 뮤텍스보다 더 일반적인 개념..
- 세마포어는 1보다 더 큰 수로 시작할 수 있다.
- 즉, 세마포어 값은 곧 한 번에 자원에 접근할 수 있는 스레드의 수다(스레드가 접근할 때마다 값을 1 감소시키기 때문에)

In [None]:
# with문 이해하기 https://m.blog.naver.com/PostView.nhn?blogId=wideeyed&logNo=221653260516&proxyReferer=https:%2F%2Fwww.google.com%2F

import threading
import time

class ThreadPool(object):
    def __init__(self):
        self.active = []
        self.lock = threading.Lock()
    
    def acquire(self, name):        #name 이라는 걸 획득했다는 걸 알려주고, 그 결과를 출력
        with self.lock:
            self.active.append(name)
            print("획득 : {0} | 스레드 풀 : {1}".format(name, self.active))
            #프린트 해줄려고 acquire 함수를 재정의 하였다
    
    def release(self, name):        #name 이라는 걸 반환했다는 걸 알려주고, 그 결과를 출력
        with self.lock:
            self.active.remove(name)
            print("반환 : {0} | 스레드 풀 : {1}".format(name, self.active))
            #프린트 해줄려고 release 함수를 재정의 하였다

def worker(semaphore, pool):
    #with : 자원을 획득하고 사용후 반납해야하는 경우에 주로 사용한다
    with semaphore:     
        name = threading.currentThread().getName()
        pool.acquire(name)
        time.sleep(1)
        pool.release(name)

if __name__ == "__main__":
    threads = []
    pool = ThreadPool()
    semaphore = threading.Semaphore(3)      #세마포어를 3으로 지정, 세마포어 객체 생성, 3개씩 스레드를 반환 및 획득을 함

    for i in range(10):
        t = threading.Thread(target = worker, name="스레드 "+ str(i), args=(semaphore, pool))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    
    print(threads)

획득 : 스레드 0 | 스레드 풀 : ['스레드 0']
획득 : 스레드 1 | 스레드 풀 : ['스레드 0', '스레드 1']
획득 : 스레드 2 | 스레드 풀 : ['스레드 0', '스레드 1', '스레드 2']
반환 : 스레드 0 | 스레드 풀 : ['스레드 1', '스레드 2']
반환 : 스레드 2 | 스레드 풀 : ['스레드 1']
획득 : 스레드 3 | 스레드 풀 : ['스레드 1', '스레드 3']
획득 : 스레드 4 | 스레드 풀 : ['스레드 1', '스레드 3', '스레드 4']
반환 : 스레드 1 | 스레드 풀 : ['스레드 3', '스레드 4']
획득 : 스레드 5 | 스레드 풀 : ['스레드 3', '스레드 4', '스레드 5']
반환 : 스레드 3 | 스레드 풀 : ['스레드 4', '스레드 5']
획득 : 스레드 6 | 스레드 풀 : ['스레드 4', '스레드 5', '스레드 6']
반환 : 스레드 4 | 스레드 풀 : ['스레드 5', '스레드 6']
반환 : 스레드 5 | 스레드 풀 : ['스레드 6']
획득 : 스레드 8 | 스레드 풀 : ['스레드 6', '스레드 8']
획득 : 스레드 7 | 스레드 풀 : ['스레드 6', '스레드 8', '스레드 7']
반환 : 스레드 6 | 스레드 풀 : ['스레드 8', '스레드 7']
획득 : 스레드 9 | 스레드 풀 : ['스레드 8', '스레드 7', '스레드 9']
반환 : 스레드 8 | 스레드 풀 : ['스레드 7', '스레드 9']
반환 : 스레드 7 | 스레드 풀 : ['스레드 9']
반환 : 스레드 9 | 스레드 풀 : []
[<Thread(스레드 0, stopped 140149778765568)>, <Thread(스레드 1, stopped 140149787158272)>, <Thread(스레드 2, stopped 140149828597504)>, <Thread(스레드 3, stopped 140149836990208)>, <Thread(스레드 4, stopped 140149

6.1.4 데드락과 스핀락

**데드락
- 두 개 이상의 프로세스나 스레드가 서로 상대방의 작업이 끝나기만을 기다리고 있기 때문에 겨로가적으로 아무것도 완료되지 못하는 상태.
- 이 네가지를 충족하면 데드락이 발생함.
   1. 상호 배제 : 자원은 한번에 한 프로세스(스레드)만 사용할 수 있다.
   2. 점유와 대기 : 한 프로세스가 자원을 갖고있으면서 다른 프로세스가 자원을 버리는 거를 기다리고 있는 상태
   3. 비선점 : 다른 프로세스가 이미 점유한 자원을 강제로 뺏어오지 못한다.
   4. 순환 대기 : 서로가 서로의 점유자원을 대기하고 있는 상태.

**스핀락
- 바쁜대기(특정 공유자원에 대해 두개 이상의 프로세스나 스레드가 그 이용 권한을 획득하려고 대기하는 현상)의 한 형태이다.

6.1.5 스레딩에 대한 구글 파이썬 스타일 가이드

In [1]:
#원자성 : 여러개의 쓰레드가 있을 때 특정 시점에 어떤 메서드를 두개 이상의 쓰레드가 동시에 호출 못한다는 것 (같이 일을 못한다!)

import threading

def consumer(cond):
    name = threading.currentThread().getName()
    print("{0} 시작".format(name))
    with cond:
        print("{0} 대기".format(name))
        cond.wait()     #통지되거나 시간제한이 만료될 때까지 기다림
        print("{0} 자원 소비".format(name))

def producer(cond):
    name = threading.currentThread().getName()
    print("{0} 시작".format(name))
    with cond:
        print("{0} 자원 생산 후 모든 소비자에게 알림".format(name))
        cond.notifyAll()

if __name__ == "__main__" :
    condition = threading.Condition()
    consumer1 = threading.Thread(name = "소비자1", target = consumer, args = (condition,))          #스레드 이름이 소비자1
    consumer2 = threading.Thread(name = "소비자2", target = consumer, args = (condition,))          #소비자 시작, 대기 하고나서 cond.wait()
    producer = threading.Thread(name = "생산자", target = producer, args = (condition,))            #생산자 시작, 자원생성후 notifyAll()을 조짐 -> 그제서야 소비자들의 wait가 풀림

    consumer1.start()
    consumer2.start()
    producer.start()

#파이썬 스레드 참조 : https://velog.io/@wltjs10645/Python-thread2

소비자1 시작
소비자1 대기
소비자2 시작
소비자2 대기
생산자 시작
생산자 자원 생산 후 모든 소비자에게 알림
소비자1 자원 소비
소비자2 자원 소비


# 6.2 좋은 습관

6.2.1 가상 환경

6.2.2 디버깅

파이썬 디버거 pdb를 이용하면 디버깅을 할 수 있다.

pdb의 명령어
- c : 프로그램을 끝까지 실행한다.
- s : 코드 다음 줄로 넘어간다.
- n : 코드 다음 줄로 넘어가되, s와 다른 점은 어떤 함수를 만날 경우 함수 전체를 실행한 뒤 다음 줄로 넘어간다는 것이다
- p : 표현식의 값을 출력한다
- l : 다음 실행할 코드를 몇 줄 보여준다.
- h : 도움말

In [None]:
import pdb
pdb.set_trace()

--KeyboardInterrupt--
--KeyboardInterrupt--
--KeyboardInterrupt--


6.2.3 프로파일링

프로그램이 예상보다 매우 느리거나, 많은 메모리가 소비된다면, 자료구조나 알고리즘을 잘못 선택했을 가능성이 많다. 이 경우 성능 항목을 검토한다.

- 읽기 전용 데이터는 리스트 대신 튜플을 사용한다.
- 반복문에서 항목이 많은 리스트나 튜플 대신에 제너레이터를 사용해서 순회한다.
- 문자열을 연결할 때 리스트에 문자열을 추가한 후 마지막에 리스트의 항목을 모두 하나로 연결한다.

In [1]:
# 좋은예 - 리스트에 문자열을 추가한 것이다.
items = ['<table>']
for last_name, first_name in employee_list:
    items.append('<tr><td>%s, %s</td></tr>' % (last_name, first_name)
    items.append('</table>')
    employee_table = ' '.join(items)

SyntaxError: ignored

cProfile 모듈

호출 시간에 대한 세부 분석을 제공하며, 병목현상을 찾는 데 사용된다.

In [2]:
import cProfile
cProfile.run('main()')

         3 function calls in 0.000 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.000    0.000 <string>:1(<module>)
        1    0.000    0.000    0.000    0.000 {built-in method builtins.exec}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}




NameError: ignored

In [3]:
import cProfile
import time

def sleep():
    time.sleep(5)

def hello_world():
    print("Hello World!")

def main():
    sleep()
    hello_world()

cProfile.run('main()')

Hello World!
         42 function calls in 5.006 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    5.005    5.005 <ipython-input-3-f3eb07e42a32>:10(main)
        1    0.000    0.000    5.005    5.005 <ipython-input-3-f3eb07e42a32>:4(sleep)
        1    0.000    0.000    0.000    0.000 <ipython-input-3-f3eb07e42a32>:7(hello_world)
        1    0.000    0.000    5.005    5.005 <string>:1(<module>)
        3    0.000    0.000    0.000    0.000 iostream.py:195(schedule)
        2    0.000    0.000    0.000    0.000 iostream.py:307(_is_master_process)
        2    0.000    0.000    0.000    0.000 iostream.py:320(_schedule_flush)
        2    0.000    0.000    0.000    0.000 iostream.py:382(write)
        3    0.000    0.000    0.000    0.000 iostream.py:93(_event_pipe)
        3    0.000    0.000    0.000    0.000 socket.py:438(send)
        3    0.000    0.000    0.000    0.000 threading.py:1050(_wa

timeit 모듈

In [4]:
import timeit
timeit.timeit("x = 2 + 2")

0.015593015999911586

In [5]:
timeit.timeit("x = sum(range(10))")

0.34820497499981684

In [7]:
import time

def sum0fN2(n):
    start = time.time()
    theSum = 0
    for i in range(1,n+1):
        theSum = theSum + i
    end = time.time()
    return theSum, end-start

if __name__ == "main__":
    n = 5
    print("총 합계 : %d\t 시간 : %10.7f초" % sum0fN2(n))
    n = 200
    print("총 합계 : %d\t 시간 : %10.7f초" % sum0fN2(n))

# 6.3 단위 테스트

6.3.1 용어

- 테스트 픽스쳐 : 테스트 설정을 위한 코드
- 테스트 케이스 : 테스트의 기본 단위
- 테스트 스위트 : unittest.TestCase 의 하위 클래스에 의해 생성된 테스트 케이스 집합
- 테스트 러너 : 하나 이상의 테스트 스위트를 실행하는 객체

6.3.2 doctest

doctest 모듈은 모듀과 함수의 독스트링 안에 테스트 코드를 작성할 때 사용한다. 테스트를 작성한 후, 다음 코드 세줄만 추가하면 된다.

In [None]:
if __name__ = "__main__"        # : 을 안붙혀도 되나?
    import doctest
    doctest.testmod()

doctest 모듈이 포함된 프로그램은 두가지 방법으로 실행가능...-v 옵션

In [12]:
def factorial(n):
    import math
    if not n >= 0 :
        raise ValueError("n must be >= 0")
    
    if math.floor(n) != n:
        raise ValueError("n must be exact integer")
    
    if n+1 == n :
        raise OverflowError("n is too large")
    
    result = 1
    factor = 2
    while factor <= n:
        result *= factor
        factor += 1

    return result

if __name__ == "__main__":
    import doctest
    doctest.testmod()

print(factorial(30))
#print(factorial(-1))
print([factorial(n) for n in range(6)])

265252859812191058636308480000000
[1, 1, 2, 6, 24, 120]


In [13]:
#unittest 모듈과 함께 실행도 가능

import doctest
import unittest
import doctest_factorial

suite = unittest.TestSuite()
suite,addTest(doctest.DocTestSuite(doctest_factorial))
runner = unittest.TextTestRunner()
print(runner.run(suite))

ModuleNotFoundError: ignored

6.3.3 pytest

일단 pytest 라이브러리 설치.

In [None]:
def func(x):
    return x + 1

def test_answer():
    assert func(3) == 51