# 시간이 올래 걸리는 작업 큐 처리 방법

- 참고 : https://blog.storyg.co/rabbitmqs/tutorials/python/02-work-queue
- 기본적으로 라운드 로빈 정책에 따라 메시지가 전달됩니다.
- 작업 큐 뒤에 있는 가정은 각 작업이 정확하게 하나의 작업자에게 전달 된다는 것입니다. 


In [31]:
! rabbitmqctl --version

3.10.7


## new_task.py

In [48]:
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# durable=True를 하면 RabbitMQ가 종료되거나 충돌 되어도 Queue가 유지됩니다.
channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [+] Sent %r" % message)
connection.close()

 [+] Sent '-f /Users/jpark/Library/Jupyter/runtime/kernel-3d5f83ef-f479-40ed-92ee-049eb5e7cbfe.json'


In [49]:
! rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
task_queue	3
hello	0


In [67]:
! rabbitmqadmin list queues

+------------+----------+
|    name    | messages |
+------------+----------+
| hello      | 0        |
| task_queue | 7        |
+------------+----------+


## worker.py

In [69]:
#!/usr/bin/env python
import pika
import time

def worker():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # durable=True를 하면 RabbitMQ가 종료되거나 충돌 되어도 Queue가 유지됩니다.
    channel.queue_declare(queue='task_queue', durable=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')

    def callback(ch, method, properties, body):
        print(" [+] Received %r" % body)
        
        # 보낸 메시지에서 "."의 개수만큼 time.sleep을 수행합니다.
        time.sleep(body.count(b'.')) # 시간이 오래 걸리는 상황을 모사합니다.
        print(" [+] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)

        
    channel.basic_qos(prefetch_count=1)
    
    #channel.basic_consume(callback,
    #                      queue='task_queue')
    
    # jpark
    channel.basic_consume(on_message_callback=callback, queue='task_queue', auto_ack=False)

    channel.start_consuming()


try:
    worker()
except KeyboardInterrupt:
    print('Interrupted')

In [70]:
try:
    worker()
except KeyboardInterrupt:
    print('Interrupted')

 [*] Waiting for messages. To exit press CTRL+C
 [+] Received b'Hello World!'
 [+] Done
 [+] Received b'Hello World!'
 [+] Done
 [+] Received b'-f /Users/jpark/Library/Jupyter/runtime/kernel-3d5f83ef-f479-40ed-92ee-049eb5e7cbfe.json'
 [+] Done
 [+] Received b'Hello World!'
 [+] Done
 [+] Received b'Hello World!'
 [+] Done
 [+] Received b'Hello World!'
 [+] Done
 [+] Received b'Hello World!'
 [+] Done
Interrupted


In [72]:
! rabbitmqctl list_queues name messages_ready messages_unacknowledged

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages_ready	messages_unacknowledged
task_queue	0	0
hello	0	0
