## 多线程并发消费

In [1]:
import time
import queue
import random
import threading
import functools

In [2]:
def producer(q: queue.Queue, item_num=100) -> None:
    """生产者"""
    for i in range(item_num):
        q.put(i)

In [3]:
def consumer(q: queue.Queue, timeout: int = 3) -> None:
    """消费者"""
    
    while True:
        try:
            item = q.get(block=True, timeout=timeout)

            # 休眠random()秒，模拟程序处理时间
            time.sleep(random.random())
        except queue.Empty:
            break

In [4]:
def debug_run_time(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        ret = f(*args, **kwargs)
        end_time = time.time()

        print(f'elapse {end_time - start_time} seconds')

        return ret
    
    return wrapper

In [5]:
@debug_run_time
def producer_consumer(concurrent=10, item_num=100) -> None:
    q = queue.Queue(maxsize=10)
    
    t_producer = threading.Thread(target=producer, args=(q, item_num))
    t_producer.start()
    
    t_consumers = []
    for _ in range(concurrent):
        t_consumer = threading.Thread(target=consumer, args=(q,))
        t_consumers.append(t_consumer)
        
    for t_consumer in t_consumers:
        t_consumer.start()


    t_producer.join()
    for t_consumer in t_consumers:
        t_consumer.join()

In [6]:
producer_consumer(10, 1000)

elapse 53.591537952423096 seconds


In [7]:
producer_consumer(100, 1000)

elapse 8.437967777252197 seconds


In [8]:
producer_consumer(1000, 1000)

elapse 4.077296018600464 seconds
