In [1]:
import logging
import threading
import time
import concurrent.futures
import queue

## Threading with one thread

In [2]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,),daemon = True)
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    x.join()
    logging.info("Main    : all done")

11:23:37: Main    : before creating thread
11:23:37: Main    : before running thread
11:23:37: Thread 1: starting
11:23:37: Main    : wait for the thread to finish
11:23:39: Thread 1: finishing
11:23:39: Main    : all done


## Threading with multiple thread(3)

In [3]:
import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(3):
        logging.info("Main    : create and start thread %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d.", index)
        thread.join()
        logging.info("Main    : thread %d done", index)

11:23:39: Main    : create and start thread 0.
11:23:39: Thread 0: starting
11:23:39: Main    : create and start thread 1.
11:23:39: Thread 1: starting
11:23:39: Main    : create and start thread 2.
11:23:39: Thread 2: starting
11:23:39: Main    : before joining thread 0.
11:23:41: Thread 0: finishing
11:23:41: Main    : thread 0 done
11:23:41: Main    : before joining thread 1.
11:23:41: Thread 1: finishing
11:23:41: Main    : thread 1 done
11:23:41: Main    : before joining thread 2.
11:23:41: Thread 2: finishing
11:23:41: Main    : thread 2 done


## Threading using ThreadPool 

In [4]:
class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

11:23:41: Testing update. Starting value is 0.
11:23:41: Thread 0: starting update
11:23:41: Thread 1: starting update
11:23:41: Thread 0: finishing update
11:23:41: Thread 1: finishing update
11:23:41: Testing update. Ending value is 1.


## Threading using LOCK
#### In this output you can see Thread 0 acquires the lock and is still holding it when it goes to sleep. Thread 1 then starts and attempts to acquire the same lock. Because Thread 0 is still holding it, Thread 1 has to wait. This is the mutual exclusion that a Lock provides.

In [5]:
class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)
if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.locked_update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

11:23:42: Testing update. Starting value is 0.
11:23:42: Thread 0: starting update
11:23:42: Thread 0 about to lock
11:23:42: Thread 0 has lock
11:23:42: Thread 1: starting update
11:23:42: Thread 1 about to lock
11:23:42: Thread 0 about to release lock
11:23:42: Thread 0 after release
11:23:42: Thread 1 has lock
11:23:42: Thread 0: finishing update
11:23:43: Thread 1 about to release lock
11:23:43: Thread 1 after release
11:23:43: Thread 1: finishing update
11:23:43: Testing update. Ending value is 2.


## Deadlock Example
### When the program calls l.acquire() the second time, it hangs waiting for the Lock to be released. In this example, you can fix the deadlock by removing the second call

### Output
`
before first acquire
before second acquire
`

In [None]:
import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")

## Producer Consumer Problem using LOCK
#### Not reliable as it allows only 1 message in the pipeline and can fail incase of burst of messages

In [6]:
import random 

SENTINEL = object()

def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")
def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)
class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

11:23:46: Producer got message: 70
11:23:46: Producer:about to acquire setlock
11:23:46: Producer:have setlock
11:23:46: Consumer:about to acquire getlock
11:23:46: Producer:about to release getlock
11:23:46: Producer:getlock released
11:23:46: Consumer:have getlock
11:23:46: Producer got message: 42
11:23:46: Producer:about to acquire setlock
11:23:46: Consumer:about to release setlock
11:23:46: Consumer:setlock released
11:23:46: Consumer storing message: 70
11:23:46: Consumer:about to acquire getlock
11:23:46: Producer:have setlock
11:23:46: Producer:about to release getlock
11:23:46: Producer:getlock released
11:23:46: Consumer:have getlock
11:23:46: Producer got message: 68
11:23:46: Consumer:about to release setlock
11:23:46: Producer:about to acquire setlock
11:23:46: Producer:have setlock
11:23:46: Consumer:setlock released
11:23:46: Producer:about to release getlock
11:23:46: Consumer storing message: 42
11:23:46: Producer:getlock released
11:23:46: Producer got message: 48
11

## Producer Consumer Problem using Queue

In [14]:
def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 10)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")

def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            "Consumer storing message: %s (size=%d)", message, queue.qsize()
        )

    logging.info("Consumer received event. Exiting")

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=5)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

11:38:52: Producer got message: 7
11:38:52: Producer got message: 8
11:38:52: Consumer storing message: 7 (size=0)
11:38:52: Producer got message: 5
11:38:52: Consumer storing message: 8 (size=0)
11:38:52: Consumer storing message: 5 (size=0)
11:38:52: Producer got message: 4
11:38:52: Producer got message: 5
11:38:52: Consumer storing message: 4 (size=0)
11:38:52: Producer got message: 9
11:38:52: Consumer storing message: 5 (size=0)
11:38:52: Producer got message: 3
11:38:52: Producer got message: 6
11:38:52: Consumer storing message: 9 (size=0)
11:38:52: Consumer storing message: 3 (size=1)
11:38:52: Consumer storing message: 6 (size=0)
11:38:52: Producer got message: 4
11:38:52: Producer got message: 1
11:38:52: Consumer storing message: 4 (size=0)
11:38:52: Consumer storing message: 1 (size=0)
11:38:52: Producer got message: 2
11:38:52: Producer got message: 2
11:38:52: Producer got message: 1
11:38:52: Producer got message: 3
11:38:52: Consumer storing message: 2 (size=0)
11:38:5