### 12.03. Communicating Between Threads

In [3]:
from queue import Queue
from threading import Thread
import time

# A thread that produces data
def producer(out_q):
    # while True:
    for i in range(10):
        # Produce some data
        data = f'PUT: mesage index {i}'
        out_q.put(data)
        time.sleep(2)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data
        print('GET:', data)

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()


GET: PUT: mesage index 0


GET: PUT: mesage index 1
GET: PUT: mesage index 2
GET: PUT: mesage index 3
GET: PUT: mesage index 4
GET: PUT: mesage index 5
GET: PUT: mesage index 6
GET: PUT: mesage index 7
GET: PUT: mesage index 8
GET: PUT: mesage index 9


In [5]:
from queue import Queue
from threading import Thread
import time

# Object that signals shutdown
_sentinel = object()

# A thread that produces data
def producer(out_q):
    # while running:
    for i in range(10):
        # Produce some data
        data = f'PUT: mesage index {i}'
        out_q.put(data)
        time.sleep(1)
        
    # Put the sentinel on the queue to indicate completion
    out_q.put(_sentinel)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        print('GET:', data)

        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break

In [6]:
import heapq
import threading

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()
 
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()
 
    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]

In [9]:
from queue import Queue
from threading import Thread
import time

# A thread that produces data
def producer(out_q):
    # while running:
    for i in range(10):
        # Produce some data
        data = f'PUT: mesage index {i}'
        out_q.put(data)
        time.sleep(1)

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        print('GET:', data)
         # Process the data
        # Indicate completion
        in_q.task_done()

# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

# Wait for all produced items to be consumed
q.join()


GET: PUT: mesage index 0


GET: PUT: mesage index 1
GET: PUT: mesage index 2
GET: PUT: mesage index 3
GET: PUT: mesage index 4
GET: PUT: mesage index 5
GET: PUT: mesage index 6
GET: PUT: mesage index 7
GET: PUT: mesage index 8
GET: PUT: mesage index 9


In [10]:
from queue import Queue
from threading import Thread, Event

# A thread that produces data
def producer(out_q):
    # while running:
    for i in range(10):
        # Produce some data
 
        data = f'PUT: mesage index {i}'

        # Make an (data, event) pair and hand it to the consumer
        evt = Event()
        out_q.put((data, evt))
 
        # Wait for the consumer to process the item
        evt.wait()

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data, evt = in_q.get()
        # Process the data
        print('GET:', data)
        
        # Indicate completion
        evt.set()

In [11]:
from queue import Queue
from threading import Thread
import copy

# A thread that produces data
def producer(out_q):
    # while True:
    for i in range(10):
    # Produce some data
        data = f'PUT: mesage index {i}'
        out_q.put(copy.deepcopy(data))

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        print('GET:', data)
        # Process the data

In [12]:
import queue
q = queue.Queue()

try:
    data = q.get(block=False)
except queue.Empty:
    pass

item = 'Hello, world!!'

try:
    q.put(item, block=False)
except queue.Full:
    pass

try:
    data = q.get(timeout=5.0)
except queue.Empty:
    pass

In [None]:
import logging
def producer(q):
    ''' proucer function '''
    try:
        q.put(item, block=False)
    except queue.Full:
        logging.warning('queued item %r discarded!', item)

In [None]:
_running = True

def consumer(q):
    ''' consumer function '''
    while _running:
        try:
            item = q.get(timeout=5.0)
            # Process item
 
        except queue.Empty:
            pass


In [13]:
print('Hello, world!!')

Hello, world!!
