# queue — Thread-safe FIFO Implementation¶

Purpose:	Provides a thread-safe FIFO implementation

The queue module provides a first-in, first-out (FIFO) data structure suitable for multi-threaded programming. It can be used to pass messages or other data between producer and consumer threads safely. Locking is handled for the caller, so many threads can work with the same Queue instance safely and easily. The size of a Queue (the number of elements it contains) may be restricted to throttle memory usage or processing.

## Basic FIFO Queue

The Queue class implements a basic first-in, first-out container. Elements are added to one “end” of the sequence using put(), and removed from the other end using get().

In [9]:
# queue_fifo.py

import queue

q = queue.Queue()

for i in range(10):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

0 1 2 3 4 5 6 7 8 9 


## LIFO Queue

In contrast to the standard FIFO implementation of Queue, the LifoQueue uses last-in, first-out ordering (normally associated with a stack data structure).

In [10]:
# queue_info.py

import queue

q = queue.LifoQueue()

for i in range(10):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

9 8 7 6 5 4 3 2 1 0 


## Priority Queue

Sometimes the processing order of the items in a queue needs to be based on characteristics of those items, rather than just the order they are created or added to the queue. 

For example, print jobs from the payroll department may take precedence over a code listing printed by a developer. 

PriorityQueue uses the sort order of the contents of the queue to decide which to retrieve.

In [24]:
#queue_priority.py

import functools
import queue
import threading


@functools.total_ordering
class Job:

    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print('Creating New job: ({}) {}'.format(priority , description))
        return

    def __eq__(self, other):
        try:
            return self.priority == other.priority
        except AttributeError:
            return NotImplemented

    def __lt__(self, other):
        try:
            return self.priority < other.priority
        except AttributeError:
            return NotImplemented


q = queue.PriorityQueue()

q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))


def process_job(q):
    while True:
        next_job = q.get()
        print('Processing job: ({})'.format(next_job.priority), next_job.description)
        q.task_done()

workers = [
    threading.Thread(target=process_job, args=(q,)),
    threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
    w.setDaemon(True)
    w.start()

q.join()

Creating New job: (3) Mid-level job
Creating New job: (10) Low-level job
Creating New job: (1) Important job
Processing job: (1) Important job
Processing job: (3) Mid-level job
Processing job: (10) Low-level job


## Building a Threaded Podcast Client

The source code for the podcasting client in this section demonstrates how to use the Queue class with multiple threads. The program reads one or more RSS feeds, queues up the enclosures for the five most recent episodes from each feed to be downloaded, and processes several downloads in parallel using threads. It does not have enough error handling for production use, but the skeleton implementation provides an example of using the queue module.

In [96]:
# fetch_podcasts.py
from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse

import os

import feedparser

# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()

# A real app wouldn't use hard-coded data...
feed_urls = [
    'http://talkpython.fm/episodes/rss',
]


def message(s):
    print('\n{}: {}'.format(threading.current_thread().name, s))

In [97]:
def download_enclosures(q):
    """This is the worker thread function.
    It processes items in the queue one after
    another.  These daemon threads go into an
    infinite loop, and only exit when
    the main thread ends.
    """
    while True:
        message('looking for the next enclosure')
        url = q.get()
        filebase = './test' #create a test dir in current path
        filename = url.rpartition('/')[-1]
        filename_store = os.path.join(filebase,filename)
        message('filename_store: {}'.format(filename_store))
        message('downloading {}'.format(filename))
        response = urllib.request.urlopen(url)
        data = response.read()
        # Save the downloaded file to the current directory
        message('writing to {}'.format(filename_store))
        with open(filename_store, 'wb') as outfile:
            outfile.write(data)
        q.task_done()

In [98]:
# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
    worker = threading.Thread(
        target=download_enclosures,
        args=(enclosure_queue,),
        name='worker-{}'.format(i),
    )
    worker.setDaemon(True)
    worker.start()


worker-0: looking for the next enclosure

worker-1: looking for the next enclosure


In [99]:
# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
    response = feedparser.parse(url, agent='fetch_podcasts.py')
    for entry in response['entries'][:5]:
        for enclosure in entry.get('enclosures', []):
            parsed_url = urlparse(enclosure['url'])
            message('queuing {}'.format(
                parsed_url.path.rpartition('/')[-1]))
            enclosure_queue.put(enclosure['url'])


MainThread: queuing grumpy-running-python-on-go.mp3

MainThread: queuing guarenteed-packages-via-conda-and-conda-forge.mp3
worker-1: filename_store: ./test/grumpy-running-python-on-go.mp3
worker-0: filename_store: ./test/guarenteed-packages-via-conda-and-conda-forge.mp3



MainThread: queuing spreading-python-through-the-sciences-with-software-carpentry.mp3
worker-1: downloading grumpy-running-python-on-go.mp3
worker-0: downloading guarenteed-packages-via-conda-and-conda-forge.mp3



MainThread: queuing bonus-python-bytes-crossover-python-3.6-is-going-to-be-awesome-kite-your-friendly-co-developing-ai.mp3

MainThread: queuing top-10-data-science-stories-of-2016.mp3

worker-0: writing to ./test/guarenteed-packages-via-conda-and-conda-forge.mp3

worker-0: looking for the next enclosure

worker-0: filename_store: ./test/spreading-python-through-the-sciences-with-software-carpentry.mp3

worker-0: downloading spreading-python-through-the-sciences-with-software-carpentry.mp3

worker-1: writi

In [100]:
# The only thing left to do is wait for the queue to empty out again, using join().

# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')


MainThread: *** main thread waiting

MainThread: *** done
