# 멀티 프로세서
    멀티 프로세스는 별도의 메모리 영역을 가지며, 특별한 메커니즘으로만 통신할 수 있다. 프로세서는 각 스레드에 대해 별도의 레지스터 집합을 불러오거나 저장하는데, 프로세스 간 데이터 공유와 통신용으로는 비효율적이다. 파이썬에서는 멀티 프로세스 방식에 subproccess 모듈을 사용한다.

# 멀티 스레드
    단일 프로세스 내의 멀티 스레드는 동일한 메모리에 접근한다. 스레드는 데이터 공유를 통해 간단하게 통신하는데, threding 모듈의 처리를 통해 한번에 한 스레드만 메모리 영역에 접근할 수 있다. 각 프로세스가 독립적인 스택, 힙, 코드, 데이터 영역을 가지는 반면, 한 프로세스에 속한 스레드는 스택 영역을 제외한 메모리 영역을 공유한다. 

파이썬에 스레드 매커니즘이 있긴 하지만, 진정한 병렬 실행이 지원되는 것은 아니다. 하지만 프로세스를 병렬로 사용하는 것은 가능하며, 이 정도도 오늘날 운영 체제에서는 충분히 효율적이다.

동시성은 논리적으로 여러 작업이 동시에 실행되는 것처럼 보이는 것이다 .예를 들어 I/O 연산 등은 프로그램의 흐름에 큰 짐이 될 수있따. 이럴 때 한 작업의 I/O 연산이 완료되기를 기다리는 동안 다른 작업을 수행하여 유휴 시간을 활용하는 것이 동시성이다.

병렬성은 물맂거으로 여러 작업이 동시에 처리되는 것이다. 데이터 병렬성과 작업 병렬성으로 나눌 수 있따. 데이터 병렬성은 같은 작업을 병렬처리하는 것이다. 하나의 커다란 작업에서 전체 데이터를 쪼갠 후 병렬처리하면 작업을 빠르게 수행할 수 있따. 작업 병렬성은 서로 다른 작업을 병렬처리하는 것이다. 웹 서버에서는 독립적인 요청을 병렬로 개별적으로 처리할 수 있다.


1. subprocess 모듈

subprocess 모듈은 부모-자식(parent-child) 프로세스 쌍을 생성하는 데 사용된다. 부모 프로세스는 사용자에 의해 실행된다. 부모 프로세스는 차례로 다른 일을 처리하는 자식 프로세스의 인스턴스를 실행한다. 부모 프로세스는 차례로 다른 일을 처리하는 자식 프로세스의 인스턴스를 실행한다. 자식 프로세스를 사용함으로써, 멀티 코어의 이점을 최대한 취하고, 동시성 문제를 운영 체제가 알아서 처리하도록 한다.



In [13]:

# window용
# 현재 디렉토리의 파일 목록을 출력합니다.
# import subprocess
# result = subprocess.run(['dir'], capture_output=True, text=True)
# print(result.stdout)

import multiprocessing
import subprocess

def run_command(command):
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
    output, error = process.communicate()
    return (output, error)

if __name__ == '__main__':
    commands = ['dir', 'type file.txt', 'echo "Hello, world!"']
    pool = multiprocessing.Pool(processes=len(commands))
    results = [pool.apply_async(run_command, args=(cmd,)) for cmd in commands]
    output = [result.get() for result in results]
    print(output)



2. threading 모듈

스레드가 여러 개로 분리되면, 스레드 간 데이터 공유의 복잡성이 증가한다. 또한 락과 데드락을 회피하는데 주의를 기울여야 한다. 파이썬 프로그램에는 단 하나의 메인 스레드만 존재한다. 멀티 스레드를 사용하려면 threading모듈을 사용한다.

내부적으로 락을 관리하려면 queue 모듈을 사용한다. 큐에 의존하면 자원의 접근을 직렬화 할 수 있고, 이는 곧 하나의 스레드만 데이터에 접근할 수 있게 한다는 뜻이다. 실행 중인 스레드가 있는 동안에는 프로그램은 종료되지 않는다. 워커 스레드가 작업을 완료해쓴ㄴ데도, 프로그램이 종료되지 않고 계속 실행되는 경우 문제가 될 수 있다. 
스레드를 데몬으로 변환하면 데몬 스레드가 실행되지 않느 즉시 프로그램이 종료된다. queue.join() 메서드는 큐가 빌 때까지 기다린다.

In [27]:
import queue
import threading

q = queue.Queue()

def worker(num):
    while True:
        item = q.get()
        if item is None:
            break
        #  작업을 처리한다.
        print("스레드 {0}: 처리 완료 {1}".format(num+1,item))
        q.task_done()
if __name__ =="__main__":
    num_worker_threads = 5
    threads = []
    for i in range(num_worker_threads):
        t= threading.Thread(target=worker, args=(i,))
        t.start()
        threads.append(t)

    for item in range(20):
        q.put(item)

    # 모든 작업이 끝날 때까지 대기한다.(block)
    q.join()
    # 워커 스레드를 종료한다.(stop)
    for i in range(num_worker_threads):
        q.put(None)
    for t in threads:
        t.join()


스레드 1: 처리 완료 0스레드 4: 처리 완료 1
스레드 4: 처리 완료 2
스레드 4: 처리 완료 3
스레드 4: 처리 완료 4
스레드 4: 처리 완료 5
스레드 4: 처리 완료 6
스레드 4: 처리 완료 7
스레드 4: 처리 완료 8
스레드 4: 처리 완료 9
스레드 4: 처리 완료 10
스레드 4: 처리 완료 11
스레드 4: 처리 완료 12
스레드 4: 처리 완료 13
스레드 4: 처리 완료 14
스레드 4: 처리 완료 15
스레드 4: 처리 완료 16
스레드 4: 처리 완료 17
스레드 4: 처리 완료 18
스레드 4: 처리 완료 19



3. 뮤텍스와 세마포어

Mutex(뮤텍스)는 락과 같다. 뮤텍스는 공유 리소스에 한 번에 하나의 스레드만 접근 할 수 있도록 하는 상호 배제 동시성 제어 정책을 강제하기 우해 설계 되었다. 예를 들어 한 스레드가 배열을 수정하고 있다고 가정하는 겨웅, 배열 작업을 절반 이상 수행했을 때, 프로세서가 다른 스레드로 전환했다고 하자, 여기에서 뮤텍스를 사용하지 않는다면, 두 스레드가 동시에 배열을 수정하는 일이 벌어진다.

개념적으로, 뮤텍스는 1부터 시작하는 정수로, 스레드는 배열을 변경해야 할 때 마다 뮤텍스를 '잠근다.' 즉, 스르데는 뮤텍스가 양수가 될 때까지 대기한 다음 숫자를 1 감소시킨다.(lock 의 개념)
배열 수정을 마치면 뮤텍스를 잠근 후, 수정 작업이 끝나고 잠금을 해제하면, 두 스레드가 배열을 동시에 수정하는 일은 일어나지 않는다.

In [34]:
from threading import Thread, Lock
import threading

def worker(mutex ,data, thread_safe):
    if thread_safe:
        mutex.acquire()

    try:
        print("스레드 {0}: {1}\n".format(threading.get_ident(),data))
    finally:
        if thread_safe:
            mutex.release()

if __name__=="__main__":
    threads = []
    thread_safe = False
    mutex = Lock()
    for i in range(20):
        t = Thread(target=worker, args=(mutex,i,thread_safe))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()

스레드 1408: 0

스레드 20168: 1

스레드 12844: 2

스레드 18596: 3

스레드 17196: 4

스레드 12252: 5

스레드 16264: 6

스레드 21304: 7

스레드 10452: 8

스레드 20940: 9

스레드 10832: 10

스레드 4060: 11

스레드 21364: 12

스레드 22264: 13

스레드 23332: 14

스레드 22004: 15

스레드 23172: 16

스레드 11748: 17

스레드 21040: 18

스레드 10236: 19



위의 결과는 실행할 때마다 결과가 다르게 나올 것이다. 이제 뮤텍스를 사용하기 위해 thread_safe 변수를 Ture로 설정한 후 다시 코드를 실행해보자.

In [38]:
from threading import Thread, Lock
import threading

def worker(mutex ,data, thread_safe):
    if thread_safe:
        mutex.acquire()

    try:
        print("스레드 {0}: {1}\n".format(threading.get_ident(),data))
    finally:
        if thread_safe:
            mutex.release()

if __name__=="__main__":
    threads = []
    thread_safe = True
    mutex = Lock()
    for i in range(20):
        t = Thread(target=worker, args=(mutex,i,thread_safe))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()

스레드 22608: 0

스레드 23336: 1

스레드 22756: 2

스레드 21788: 3

스레드 23544: 4

스레드 21908: 5

스레드 20488: 6

스레드 7292: 7

스레드 14872: 8

스레드 13004: 9

스레드 10596: 10

스레드 13836: 11

스레드 5200: 12

스레드 20704: 13

스레드 19952: 14

스레드 2812: 15

스레드 21620: 16

스레드 17956: 17

스레드 20144: 18

스레드 17472: 19



한편, 세마포어는 뮤텍스보다 더 일반적으로 사용되는 개념이다. 세마포어는 1보다 큰 수로 시작할 수 있다. 세마포어 값은 곧 한 번에 자원에 접근할 수 있는 스레드의 수다. 세마포어는 뮤텍스의 락 및 언락 작업과 유사한 대기 및 신호 작업을 지원한다.bb

In [40]:
# 세마포어 예시

import threading 
import time

class ThreadPool(object):
    def __init__(self):
        self.active=[]
        self.lock = threading.Lock()
    def acquire(self, name):
        with self.lock:
            self.active.append(name)
            print("획득: {0} | 스레드 풀: {1}".format(name,self.active))

    def release(self, name):
        with self.lock:
            self.active.remove(name)
            print("반환: {0} | 스레드 풀: {1}".format(name,self.active))


def worker(semaphore, pool):
    with semaphore:
        name = threading.currentThread().getName()
        pool.acquire(name)
        time.sleep(1)
        pool.release(name)

if __name__ == "__main__":
    threads = []
    pool = ThreadPool()
    semaphore = threading.Semaphore(3)
    for i in range(10):
        t = threading.Thread(
            target=worker, name="스레드 " + str(i), args=(semaphore, pool)
        )
        t.start()
        threads.append(t)
    for t in threads :
        t.join()



  name = threading.currentThread().getName()
  name = threading.currentThread().getName()


획득: 스레드 0 | 스레드 풀: ['스레드 0']
획득: 스레드 1 | 스레드 풀: ['스레드 0', '스레드 1']
획득: 스레드 2 | 스레드 풀: ['스레드 0', '스레드 1', '스레드 2']
반환: 스레드 0 | 스레드 풀: ['스레드 1', '스레드 2']
반환: 스레드 1 | 스레드 풀: ['스레드 2']
반환: 스레드 2 | 스레드 풀: []
획득: 스레드 4 | 스레드 풀: ['스레드 4']
획득: 스레드 3 | 스레드 풀: ['스레드 4', '스레드 3']
획득: 스레드 5 | 스레드 풀: ['스레드 4', '스레드 3', '스레드 5']
반환: 스레드 3 | 스레드 풀: ['스레드 4', '스레드 5']
반환: 스레드 4 | 스레드 풀: ['스레드 5']
반환: 스레드 5 | 스레드 풀: []
획득: 스레드 6 | 스레드 풀: ['스레드 6']
획득: 스레드 7 | 스레드 풀: ['스레드 6', '스레드 7']
획득: 스레드 8 | 스레드 풀: ['스레드 6', '스레드 7', '스레드 8']
반환: 스레드 7 | 스레드 풀: ['스레드 6', '스레드 8']
반환: 스레드 6 | 스레드 풀: ['스레드 8']
반환: 스레드 8 | 스레드 풀: []
획득: 스레드 9 | 스레드 풀: ['스레드 9']
반환: 스레드 9 | 스레드 풀: []


4. 데드락과 스핀락

데드락은 두 개 이상의 프로세스나 스레드가 서로 상대방의 작업이 끝나기만을 기다리고 있기 떄문에 결과적으로 아무것도 완료되지 못하는 상태다.
 프로그램에서 락을 할당하고, 락을 순서대로 획득한다면, 교착 상태를 막을 수 있다.

 다음 네 가지 조건을 모두 충족하면 데드락이 발생한다. 네 가지 조건 중 하나라도 막을 수 있다면, 데드락 문제를 해결할 수 있다.

 - 상호 배제(mutual exclusion) : 자원은 한 번에 한 프로세스(혹은 스레드)만 사용할 수 있다.
 - 점유와 대기(hold and wait) : 한 프로세스가 자원을 가지고 있느 상태에서, 다른 프로세스가 쓰는 자원의 반납을 기다린다.
 - 비선점(no preemption) : 프로세스 1,2,3이 있다고 가정할 때 1은 2가 점유한 자원을 2는 3이 점유한 자원을 3은 1이 점유한 자원을 대기하는 상태

 스핀락(spinlock)은 고성능 컴퓨팅 상황에 유용한 바쁜 대기(busy waiting)의 한 형태다. 스피낡은 임계 구역에 진입이 불가능할 때, 진입이 가능할 때까지 반복문을 돌면서 재시도하는 방식으로 구현된 락이다.

 5. 스레딩에 대한 구글 파이썬 스타일 가이드

 내장 타입의 원자성에 의존하지 않는다. 딕셔너리 같은 파이썬 기본 데이터 타입은 원자적 연산을 수행하는 반면, 내장 타입이 원자적이지 않은 경우가 있어서, 내장 타입의 원자성에 의존해선 안 된다. 또한 원자적 변수 할당에 의존하지 않아야 한다.(why? -> dictionary 에 의존하기 때문?)

 queue 모듈의 Queue 데이터 타입을 스레드 간 데이터를 전달하는 기본 방식으로 사용한다. 그렇지 않으면, threading 모듈의 락을 사용한다. 저수준의 락 대신, threading.Condintion을 사용할 수 있도록 조건 변수를 적절하게 사용하는 방법을 숙지한다. 생산자- 소비자 모데르이 간단한 예제를

In [42]:
import threading


def consumer(cond):
    name = threading.currentThread().getName()
    print("{0} 시작".format(name))
    with cond:
        print("{0} 대기".format(name))
        cond.wait()
        print("{0} 자원 소비".format(name))

def producer(cond):
    name = threading.currentThread().getName()
    print("{0} 시작".format(name))
    with cond:
        print("{0} 자원 생산 후 모든 소비자에게 알림".format(name))
        cond.notifyAll()

if __name__ == "__main__":
    condition = threading.Condition()
    consumer1 = threading.Thread(name='소비자1', target=consumer , args=(condition,))
    consumer2 = threading.Thread(name='소비자2', target=consumer , args=(condition,))
    producer1 = threading.Thread(name='생산자', target=producer , args=(condition,))

    consumer1.start()
    consumer2.start()
    producer1.start()


소비자1 시작
소비자1 대기
소비자2 시작
소비자2 대기
생산자 시작
생산자 자원 생산 후 모든 소비자에게 알림
소비자2 자원 소비
소비자1 자원 소비


  name = threading.currentThread().getName()
  name = threading.currentThread().getName()
  name = threading.currentThread().getName()
  name = threading.currentThread().getName()
  cond.notifyAll()


# 단위 테스트

개별 함수 및 클래스의 메서드에 대한 테스트 코드를 작성한여, 예상한 값이 맞게 나오는지 확인하는 것은 좋은 습관이다.
파이썬 표준 라이브러리는 이러한 단위 테스트를 위해 doctest와 unittest 모듈을 제공한다. 또한 외부 라이브러리인 pytest 모듈도 존재

1. 용어

- 테스트 픽스쳐(test fixture): 테스트 설정을 위한 코드(테스트용 입력 파일을 만들었다 삭제하는 코드)
- 테스트 케이스(test case): 테스트의 기본 단위
- 테스트 슈트(test suite): unitest.TestCase의 하위 클래스에 의해 생성된 테스트 케이스 집합. 각 테스트 케이스의 메서드 이름은 test로 시작한다.
- 테스트 러너(test runner): 하나 이상의 테스트 스위트를 실행하는 객체

2. doctest

doctest 모듈은 모듈과 함수의 docstring 안에 테스트 코드를 작성할 떄 사용한다. 테스트를 작성한 후, 다음 코드 세 줄만 추가하면 된다.


In [None]:
if __name__ == "__main__":
    import doctest
    doctest.testmod()