# Better Way 39 스레드 간의 작업을 조율하려면 Queue를 사용하자

파이프라인 (pipeline): 일렬로 이어진 단계들로 구성되며, 각 단계에는 특정 함수가 연결되어 있음.
- 새 작업 요소는 끊임없이 파이프라인의 앞쪽에 추가됨
- 각 함수는 동시에 자신이 속한 단계에 배정된 작업 요소를 처리할 수 있음.
- 남아있는 단계가 더는 없을 때까지, 각 함수에서 처리를 완료할 때마다 작업은 다음 단계로 이동함.
- 파이썬으로 쉽게 병렬화할 수 있는 블로킹 I/O나 서브프로세스를 이용하는 작업에 특히 잘 맞는다. (BW 37 참고)

스레드 안전 생산자-소비자 큐 (thread-safe producer-consumer queue)
- 파이프라인 단계 사이에서 작업을 전달할 방법의 모델링
- 파이썬에서 스레드 안전성의 중요성 이해 (BW 38, BW 46 참고)

In [2]:
import threading

class MyQueue(object):
    def __init__(self):
        self.items = dequeue()
        self.lock = Lock()
        
    def put(self, item): #생산자 (producer)
        with self.lock:
            self.items.append(item)
            
    def get(self): #소비자 (consumer)
        with self.lock:
            return self.items.popleft()

다음은 큐에서 작업을 꺼내와서 함수를 실행하고, 결과를 다른 큐에 넣는 파이썬 스레드
- 파이프라인의 각 단계를 표현
- 작업 스레드가 새 입력을 몇 번이나 체크하고, 작업을 얼마나 완료하는지 추적

In [6]:
from threading import Thread

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
        
    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError: #이전 단계에서 아직 작업을 완료하지 않아서 입력 큐가 비어 있는 경우
                sleep(0.01) #재운다 (=기다림)
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

In [None]:
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()

#download, resize, upload는 각각 단계에서 실행할 함수 (여기서는 정의하지 않아서 실행하면 에러 발생함)
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue)
]

In [None]:
for thread in threads:
    thread.start()
    
for _ in range(1000):
    download_queue.put(object()) #실제 이미지 데이터가 아니라, 일반 object 인스턴스를 1000개 넣음

In [None]:
while len(done_queue.items) < 1000:
    # 위에서 입력한 1000개의 인스턴스가 모두 처리될 때까지
    # 기다리는 동안 유용한 작업을 수행함
    # ...

In [None]:
#1000개의 아이템을 처리하는 동안 몇번이나 폴링되었는지 확인 -- 매우 많음 (3030번)
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling', polled, 'times')

왜 이런 일이 발생하는가?
- 작업 수행 함수의 실행 속도가 제각각이면, 초기 단계가 후속 단계의 진행을 막아 파이프라인이 정체됨
- 후속 단계에서 처리할 것이 없어서 지속적으로 새 작업을 가져오려고 짧은 주기로 입력 큐를 확인함
- 결국 작업 스레드는 유용한 작업을 전혀 하지 않으면서 CPU 시간을 낭비하는 일이 됨 (=끊임없이 IndexError 예외를 일으키고 잡는 일만 함)

추가적인 문제
- 입력 작업을 모두 완료했는지 판단하려면 done_queue에 결과가 모두 쌓일 때까지 기다려야 함
- Worker의 run 메서드는 루프에서 끊임없이 실행되며, 루프를 빠져나오도록 작업 스레드에 신호를 줄 방법이 없음
- 최악의 문제로 파이프라인이 정체되면 프로그램이 제멋대로 고장이 날 수 있음. (예컨데, 1단계는 빠르고 2단계는 느린 경우 1-2단계를 잇는 큐의 크기가 계속 증가하여 2단계의 처리가 큐가 증가하는 속도를 따라잡지 못해 메모리 부족 발생)

결론: 위의 구현은 잘 작동하기는 해도 **잘못된 구현** 즉, 파이프라인이 나쁘다는 것이 아니라 좋은 생산자-소비자 큐를 직접 만들기 어렵다는 사실.

해결: 내장 모듈 queue에 있는 **Queue 클래스** 사용
- 새 데이터가 생길 때까지 get 메서드가 블록되게 하여 작업 스레드가 계속해서 데이터가 있는지 체크하는 상황 (busy waiting) 없애줌

In [8]:
from queue import Queue
queue = Queue()

def consumer():
    print('Consumer waiting')
    queue.get() #뒤에 나오는 put() 이후에 실행함
    print('Consumer done')

In [9]:
# 스레드가 처음으로 실행할 때도 Queue 인스턴스에 아이템이 들어가서 get 메서드에서 반환할 아이템이 생기기 전에는 마치지 못함.
thread = Thread(target=consumer)
thread.start()
print('Producer putting')
queue.put(object()) # 앞에 나온 get() 이전에 실행함
thread.join()
print('Producer done')

Consumer waitingProducer putting

Consumer done
Producer done


- 파이프라인 정체 문제를 해결하려면 두 단계 사이에서 대기할 작업의 최대 개수를 Queue에 설정해야 함
- 큐가 이미 이 버퍼 크기만큼 가득 차 있으면 put 호출이 블록됨

In [12]:
import time 

queue = Queue(1) # 버퍼 크기는 1

def consumer():
    time.sleep(0.1)            # 대기
    queue.get()                # 두 번째로 실행
    print('Consumer got 1')
    queue.get()                # 네 번쨰로 실행
    print('Consumer got 2')

In [13]:
thread = Thread(target=consumer)
thread.start()

queue.put(object())            # 첫 번째로 실행
print('Producer put 1')
queue.put(object())            # 세 번쨰로 실행
print('Producer put 2')
thread.join()
print('Producer done')

Producer put 1
Consumer got 1
Producer put 2
Consumer got 2
Producer done


- consumer 스레드에서 get 호출하기 전에, 생산 스레드에서 put으로 객체 두 개를 큐에 집어넣는 동작이 일어나야 함.
- 그러나 Queue 크기가 1이므로, 두번째 put 호출이 블록된 상태에서 빠져나와서 두번째 아이템을 큐에 넣으려면, 
- 큐에 아이템을 추가하는 생산자는 소비 스레드에서 적어도 한번은 get을 호출하기를 기다려야 함

In [14]:
in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get()
    print('Consumer working')
    # do something
    # ...
    print('Consumer done')
    in_queue.task_done()

In [15]:
Thread(target=consumer).start()

Consumer waiting


In [16]:
in_queue.put(object())
print('Producer waiting')
in_queue.join()
print('Producer done')

Producer waiting
Consumer working
Consumer done
Producer done


Queue 클래스는 task_done 메서드로 작업 진행을 추적할 수 있음
- 작업 진행을 추적하면, 특정 단계의 입력 큐가 빌 때까지 기다릴 수 있으므로 파이프라인의 끝에서 done_queue를 폴링하지 않아도 됨.
- 생산자는 조인으로 소비 스레드를 대기하거나 폴링을 하지 않아도 됨.
- Queue 인스턴스의 join 호출하여 in_queue가 완료하기를 기다리면 됨.
- 설사 큐가 비더라도 in_queue의 join 메서드는 이미 큐에 추가된 모든 아이템에 task_done을 호출할 때까지 완료되지 않음.

In [17]:
class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL) #더는 입력 아이템이 없음을 알리는 특별 아이템을 추가
        
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return # (더 이상 큐에 아이템이 없으므로) 스레드 종료
                yield item
            finally:
                self.task_done()

In [18]:
class  StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

In [None]:
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue)
]

In [None]:
for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())
download_queue.close() #첫번째 단계의 모든 입력이 큐에 추가되면, 큐를 닫는다

In [None]:
# 각 단계들을 연결하는 큐에 join을 호출해서 작업이 완료되기를 기다리고, 
# 각 단계가 끝날 때마다 입력 큐를 닫아서 다음 단계에 중단 신호를 보낸다 
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')

핵심 정리
- 파이프라인은 여러 파이썬 스레드를 사용하여 동시에 실행하는 작업의 순서를 구성하기에 아주 좋은 방법이다.
- 병행 파이프라인을 구축할 때는 많은 문제 (바쁜 대기, 작업자 중단, 메모리 부족) 일어날 수 있다는 점을 주의하자.
- Queue 클래스는 연산 블로킹, 버퍼 크기, 조인 등 견고한 파이프라인을 만드는 데 필요한 기능을 모두 갖췄다.