### Start a Thread
https://realpython.com/intro-to-python-threading/
To start a separate thread, you create a Thread instance and then tell it to .start():

When you create a Thread, you pass it a function and a list containing the arguments to that function. In this case, you’re telling the Thread to run thread_function() and to pass it 1 as an argument.

For this article, you’ll use sequential integers as names for your threads. There is threading.get_ident(), which returns a unique name for each thread, but these are usually neither short nor easily readable.

thread_function() itself doesn’t do much. It simply logs some messages with a time.sleep() in between them.

In [1]:
import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

23:52:05: Main    : before creating thread
23:52:05: Main    : before running thread
23:52:05: Thread 1: starting
23:52:05: Main    : wait for the thread to finish
23:52:05: Main    : all done
23:52:07: Thread 1: finishing


### Daemon

Python threading has a more specific meaning for daemon. A daemon thread will shut down immediately when the program exits. One way to think about these definitions is to consider the daemon thread a thread that runs in the background without worrying about shutting it down.

If a program is running Threads that are not daemons, then the program will wait for those threads to complete before it terminates. Threads that are daemons, however, are just killed wherever they are when the program is exiting.

To tell one thread to wait for another thread to finish, you call .join(). 

In [2]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,),daemon=True)
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

23:54:36: Main    : before creating thread
23:54:36: Main    : before running thread
23:54:36: Thread 1: starting
23:54:36: Main    : wait for the thread to finish
23:54:36: Main    : all done
23:54:38: Thread 1: finishing


## Working With Many Threads

In [3]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(3):
        logging.info("Main    : create and start thread %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d.", index)
        thread.join()
        logging.info("Main    : thread %d done", index)

23:56:12: Main    : create and start thread 0.
23:56:12: Thread 0: starting
23:56:12: Main    : create and start thread 1.
23:56:12: Thread 1: starting
23:56:12: Main    : create and start thread 2.
23:56:12: Thread 2: starting
23:56:12: Main    : before joining thread 0.
23:56:14: Thread 0: finishing
23:56:14: Main    : thread 0 done
23:56:14: Main    : before joining thread 1.
23:56:14: Thread 1: finishing
23:56:14: Main    : thread 1 done
23:56:14: Main    : before joining thread 2.
23:56:14: Thread 2: finishing
23:56:14: Main    : thread 2 done


### also

The code creates a ThreadPoolExecutor as a context manager, telling it how many worker threads it wants in the pool. It then uses .map() to step through an iterable of things, in your case range(3), passing each one to a thread in the pool.

The end of the with block causes the ThreadPoolExecutor to do a .join() on each of the threads in the pool. It is strongly recommended that you use ThreadPoolExecutor as a context manager when you can so that you never forget to .join() the threads.

In [4]:
import concurrent.futures

# [rest of code]

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

23:57:11: Thread 0: starting
23:57:11: Thread 1: starting
23:57:11: Thread 2: starting
23:57:13: Thread 0: finishing
23:57:13: Thread 1: finishing
23:57:13: Thread 2: finishing


#### Race conditions can occur when two or more threads access a shared piece of data or resource. 

FakeDatabase is keeping track of a single number: .value. This is going to be the shared data on which you’ll see the race condition.

.__init__() simply initializes .value to zero. So far, so good.

.update() looks a little strange. It’s simulating reading a value from a database, doing some computation on it, and then writing a new value back to the database.

In this case, reading from the database just means copying .value to a local variable. The computation is just to add one to the value and then .sleep() for a little bit. Finally, it writes the value back by copying the local value back to .value.

In [8]:
class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)
        
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)
    
    """
    The program creates a ThreadPoolExecutor with two threads and then calls .submit() on each of them, telling them to run database.update().

    .submit() has a signature that allows both positional and named arguments to be passed to the function running in the thread: .submit(function, *args, **kwargs)
    
    In the usage above, index is passed as the first and only positional argument to database.update(). You’ll see later in this article where you can pass multiple arguments in a similar manner.

    Since each thread runs .update(), and .update() adds one to .value, you might expect database.value to be 2 when it’s printed out at the end. But you wouldn’t be looking at this example if that was the case.
    """

00:03:33: Testing update. Starting value is 0.
00:03:33: Thread 0: starting update
00:03:33: Thread 1: starting update
00:03:33: Thread 0: finishing update
00:03:33: Thread 1: finishing update
00:03:33: Testing update. Ending value is 1.


To solve your race condition above, you need to find a way to allow only one thread at a time into the read-modify-write section of your code. The most common way to do this is called Lock in Python. In some other languages this same idea is called a mutex. Mutex comes from MUTual EXclusion, which is exactly what a Lock does.

In [13]:
class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)
        
if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.locked_update, index)
    logging.info("Testing update. Ending value is %d.", database.value)        
        

21:50:41: Testing update. Starting value is 0.
21:50:41: Thread 0: starting update
21:50:41: Thread 1: starting update
21:50:41: Thread 0 about to lock
21:50:41: Thread 1 about to lock
21:50:41: Thread 0 has lock
21:50:42: Thread 0 about to release lock
21:50:42: Thread 0 after release
21:50:42: Thread 1 has lock
21:50:42: Thread 0: finishing update
21:50:43: Thread 1 about to release lock
21:50:43: Thread 1 after release
21:50:43: Thread 1: finishing update
21:50:43: Testing update. Ending value is 2.


Producer-Consumer Using Lock
Since this is an article about Python threading, and since you just read about the Lock primitive, let’s try to solve this problem with two threads using a Lock or two.

The general design is that there is a producer thread that reads from the fake network and puts the message into a Pipeline:

In [14]:
import random 

class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)
        
        
SENTINEL = object()

def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")
    
def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)
            
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

22:02:56: Producer got message: 62
22:02:56: Consumer:about to acquire getlock
22:02:56: Producer:about to acquire setlock
22:02:56: Producer:have setlock
22:02:56: Producer:about to release getlock
22:02:56: Producer:getlock released
22:02:56: Consumer:have getlock
22:02:56: Producer got message: 95
22:02:56: Consumer:about to release setlock
22:02:56: Producer:about to acquire setlock
22:02:56: Consumer:setlock released
22:02:56: Producer:have setlock
22:02:56: Consumer storing message: 62
22:02:56: Producer:about to release getlock
22:02:56: Consumer:about to acquire getlock
22:02:56: Producer:getlock released
22:02:56: Consumer:have getlock
22:02:56: Producer got message: 36
22:02:56: Consumer:about to release setlock
22:02:57: Producer:about to acquire setlock
22:02:57: Consumer:setlock released
22:02:57: Producer:have setlock
22:02:57: Consumer storing message: 95
22:02:57: Producer:about to release getlock
22:02:57: Consumer:about to acquire getlock
22:02:57: Producer:getlock re

### Producer-Consumer Using Queue
If you want to be able to handle more than one value in the pipeline at a time, you’ll need a data structure for the pipeline that allows the number to grow and shrink as data backs up from the producer.

Python’s standard library has a queue module which, in turn, has a Queue class. Let’s change the Pipeline to use a Queue instead of just a variable protected by a Lock. You’ll also use a different way to stop the worker threads by using a different primitive from Python threading, an Event.

Let’s start with the Event. The threading.Event object allows one thread to signal an event while many other threads can be waiting for that event to happen. The key usage in this code is that the threads that are waiting for the event do not necessarily need to stop what they are doing, they can just check the status of the Event every once in a while.

The triggering of the event can be many things. In this example, the main thread will simply sleep for a while and then .set() it:

In [15]:
class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)
        
        
def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")
    
def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s  (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")
    

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()
        
        
        

NameError: name 'queue' is not defined

In [16]:
import concurrent.futures
import logging
import queue
import random
import threading
import time

def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")

def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            "Consumer storing message: %s (size=%d)", message, queue.qsize()
        )

    logging.info("Consumer received event. Exiting")

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

22:40:45: Producer got message: 59
22:40:45: Producer got message: 98
22:40:45: Consumer storing message: 59 (size=0)
22:40:45: Producer got message: 40
22:40:45: Consumer storing message: 98 (size=0)
22:40:45: Producer got message: 28
22:40:45: Consumer storing message: 40 (size=0)
22:40:45: Producer got message: 65
22:40:45: Consumer storing message: 28 (size=0)
22:40:45: Producer got message: 100
22:40:45: Consumer storing message: 65 (size=0)
22:40:45: Producer got message: 88
22:40:45: Consumer storing message: 100 (size=0)
22:40:45: Producer got message: 20
22:40:45: Consumer storing message: 88 (size=0)
22:40:45: Producer got message: 66
22:40:45: Consumer storing message: 20 (size=0)
22:40:45: Producer got message: 12
22:40:45: Consumer storing message: 66 (size=0)
22:40:45: Producer got message: 12
22:40:45: Consumer storing message: 12 (size=0)
22:40:45: Producer got message: 91
22:40:45: Consumer storing message: 12 (size=0)
22:40:45: Producer got message: 78
22:40:45: Consu