# 스레드 간의 작업을 조율하려면 Queue를 사용하자

많은 작업을 동시에 실행하는 파이썬 프로그램은 작업 조율이 필요하다. 가장 유용한 병행 작업 방식 중 하나는 함수의 파이프라인이다.

파이프라인은 일렬로 이어진 단계들로 구성되며, 각 단계에는 특정 함수가 연결되어 있다. 새 작업 요소는 끊임없이 파이프라인의 앞쪽에 추가된다. 각 함수는 동시에 자신이 속한 단계에 배정된 작업 요소를 처리할 수 있다. 남아 있는 단계가 더는 없을 때까지, 각 함수에서 처리를 완료할 때마다 작업은 다음 단게로 이동한다.

이 방법은 파이썬으로 쉽게 병렬화할 수 있는 블로킹 I/O나 서브프로세스를 이용하는 작업에 특히 잘 맞는다.

## 예시: 디지털 카메라  

1. 새 이미지 추출
2. 리사이즈 함수로 다운 이미지 처리
3. 업로드로 리사이즈된 이미지 소비

## 작업을 동시에 처리하기 위해 조립하는 방법

### 1. 파이프라인 단계 사이에서 작업을 전달할 방버

스레드 안전 생산자-소비자 큐로 모델링 가능

In [13]:
from queue import deque
from threading import Lock

class MyQueue(object):
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
        
    # 생산자인 디지털 카메라는 새 이미지를 대기 아이템 리스트의 끝에 추가
    def put(self, item):
        with self.lock:
            self.items.append(item)
            
    # 소비자인 처리 파이프라인의 첫 번째 단계에서는 대기 아이템 리스트의 앞쪽에서 이미지를 꺼내온다.
    def get(self):
        with self.lock:
            return self.items.popleft()

이런 큐에서 작업을 꺼내와서 함수를 실행한 후 결과를 또 다른 큐에 넣는 파이썬 스레드로 파이프라인의 각 단계 표현, 스레드가 새 입력을 몇 번이나 체크하고, 작업을 얼마나 완료하는지 추적한다.

입력 큐가 비어있는 경우를 작업 스레드에서 적절히 처리하는 과정이 까다롭다. 다음 코드에서 IndexError 예외를 잡는 부분이 이에 해당한다. 이 경우를 조립 라인이 정체된 상황으로 보면 된다.

In [14]:
from threading import Thread

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
        
    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                sleep(0.01) # 처리할 아이템이 없음
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

In [16]:
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()

def download():
    pass
def resize():
    pass
def upload():
    pass
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

In [19]:
for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())

RuntimeError: threads can only be started once

In [21]:
while len(done_queue.items) < 1000:
    # 기다리는 동안 유용한 작업을 수행함
    # ...

SyntaxError: unexpected EOF while parsing (<ipython-input-21-04d9b03e301f>, line 3)

run 메서드에서 IndexError 예외를 잡는 까다로운 부분이 많이 실행된다.

In [22]:
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling', polled, 'times')

Processed 0 items after polling 3 times


작업 수행 함수의 실행 속도가 다르면 초기 단계가 후속 단계의 진행을 막아 파이프라인이 정체될 수 있다. 그러면 후속 단계에서 처리할 것이 없어서 지속적으로 새 작업을 가져오려고 짧은 주기로 입력 큐를 확인하게 된다.

### 피해야 할 문제

1. 입력 작업을 모두 완료했는지 판단하기 위해 done_queue에 결과가 모두 쌓일 때까지 기다려야 한다.
2. Worker의 run 메서드는 루프에서 끊임없이 실행된다. 루프를 빠져나오도록 작업 스레드에 신호를 줄 방법이 없다.
3. 최악의 문제로 파이프라인이 정체 시 프로그램이 고장난다.

## Queue로 문제 해결하기

내장 모듈 queue에 들어있는 Queue 클래스는 이런 문제를 해결하는 데 필요한 기능을 모두 제공한다.

Queue는 새 데이터가 생길 때 까지 get 메서드가 블록되게 하여 작업 스레드가 계속해서 데이터가 있는지 체크하는 상황을 없애준다.

In [24]:
from queue import Queue
queue = Queue()

def consumer():
    print('Consumer waiting')
    queue.get()
    print('Consumer done')
    
thread = Thread(target=consumer)
thread.start()

## 스레드가 처음으로 실행할 때도 Queue 인스턴스에 아이템이 들어가서 get 메서드에서 반환할 아이템이 생기기 전에는 마치지 못한다.

print('Producer puttting')
queue.put(object())
thread.join()
print('Producer done')

Consumer waiting
Producer puttting
Consumer done
Producer done


파이프라인 정체 문제를 해결하기 위해 두 단계 사이에서 대기할 작업의 최대 개수를 Queue에 설정해야 한다. 큐가 이미 이 버퍼 크기만큼 가득 차 있으면 put 호출이 블록된다.

In [33]:
import time

queue = Queue(1)

def consumer():
    time.sleep(0.1)
    queue.get()
    print('Consumer got 1')
    queue.get()
    print('Consumer got 2')
    
thread = Thread(target=consumer)
thread.start()

queue.put(object())
print('Producer put 1')
queue.put(object())
print('Producer put 2')
thread.join()
print('Producer done')

Producer put 1
Consumer got 1
Producer put 2
Consumer got 2
Producer done


대기 결과로 consumer 스레드에서 get을 호출하기 전에 생산 스레드에서 put으로 객체 두 개를 큐에 집어넣는 동작이 일어나야 한다. 하지만 Queue의 크기가 1이다. 

    두번째 put 호출이 블록된 상태에서 빠져나와서 두 번재 아이템을 큐에 넣을 수 있으려면, 큐에 아이템을 추가하는 생산자는 소비 스레드에서 적어도 한 번은 get을 호출하길 기다려야 한다.

In [34]:
in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get()
    print('Consumer working')
    # 작업을 수행함
    # ...
    print('Consumer done')
    in_queue.task_done()
    
Thread(target=consumer).start()

Consumer waiting


In [35]:
in_queue.put(object())
print('Producer waiting')
in_queue.join()
print('Producer done')

Producer waiting
Consumer working
Consumer done
Producer done


이런 모든 동작을 Queue 서브 클래스에 넣고, 처리를 중단해야 할 때 작업 스레드에 알리는 기능도 추가하자. 다음은 close 메서드를 정의하여 더는 입력 아이템이 없음을 알리는 특별한 아이템을 큐에 추가하는 코드다.

In [37]:
class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
        
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return # 스레드 종료
                yield item
            finally:
                self.task_done()