# Introduction to threading

Ref: [An Intro to Threading in Python by RealPython](https://realpython.com/intro-to-python-threading/#what-is-a-thread)

The threads may be running on different processors, but they will only be running **one at a time** (see GIL). Getting multiple tasks running simultaneously requires a non-standard implementation of Python, or using `multiprocessing`.

If you are running a standard Python implementation, writing in only Python, and have a CPU-bound problem, you should check out the `multiprocessing` module instead.

# Single-thread example

In [None]:
import threading
import logging
import time

In [9]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    x = threading.Thread(target=thread_function, args=(1,), daemon=False)
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

18:42:32: Thread 1: starting
18:42:32: Main    : wait for the thread to finish
18:42:32: Main    : all done


`x = threading.Thread(target=thread_function, args=(1,), daemon=False)` create a non-daemon thread, thus the program will wait for the thread to finish before exiting.

However, if the `daemon` flag is `True` then the program won't wait for the thread, and the thread is killed when the main program terminates.

## Python daemon

Daemon in the Python threading module is different from the daemon in the Unix system.

A `daemon` thread will shut down immediately when the program exits. If a program is running `Threads` that are not `daemon`, then the program will wait for those threads to complete before it terminates.

## join() a Thread
When we want to wait for a thread to finish before terminating the program, we all `.join()` on the thread.

# threading.Lock

```
my_lock = threading.Lock()

my_lock.acquire()

# critical section

my_lock.release()
```

Or, using `with` (no need to explicitly call `.acquire()` and `.release()`)
```
my_lock = threading.Lock()
with my_lock:
    pass
```

In [18]:
my_lock = threading.Lock()
my_lock.acquire()

my_lock.release()

with my_lock:
    pass

# Producer-consumer pipeline using Lock

In [17]:
import concurrent
import random


class Pipeline:
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()  # prevent reading before message is written

    def set_message(self, message):
        self.producer_lock.acquire()  # prevent other threads from setting the message
        self.message = message
        self.consumer_lock.release()
        # before the message is read, no writing allowed, thus not releasing the producer lock

    def get_message(self):
        self.consumer_lock.acquire()
        message = self.message
        self.producer_lock.release()
        # before the message is set again, no reading allowed, thus not releasing the consumer lock
        return message


def producer(pipeline):
    for index in range(5):
        message = random.randint(1, 101)
        logging.info("Producer set message: %s", message)
        pipeline.set_message(message)
    pipeline.set_message(-1)


def consumer(pipeline):
    message = 0
    while message != -1:
        message = pipeline.get_message()
        if message != -1:
            logging.info("Consumer get message: %s", message)



if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

22:53:57: Producer set message: 15
22:53:57: Producer set message: 88
22:53:57: Consumer get message: 15
22:53:57: Producer set message: 83
22:53:57: Consumer get message: 88
22:53:57: Producer set message: 60
22:53:57: Consumer get message: 83
22:53:57: Producer set message: 92
22:53:57: Consumer get message: 60
22:53:57: Consumer get message: 92


# Producer-consumer pipeline using queue.Queue

In [19]:
import queue


def producer(queue, stop_event):
    while not stop_event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer set message: %s", message)
        queue.put(message)


def consumer(queue, stop_event):
    while not stop_event.is_set() or not queue.empty():
        message = queue.get()
        logging.info("Consumer get message: %s", message)


if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

00:01:09: Producer set message: 38
00:01:09: Producer set message: 81
00:01:09: Consumer get message: 38
00:01:09: Producer set message: 55
00:01:09: Consumer get message: 81
00:01:09: Producer set message: 27
00:01:09: Consumer get message: 55
00:01:09: Producer set message: 1
00:01:09: Consumer get message: 27
00:01:09: Producer set message: 53
00:01:09: Consumer get message: 1
00:01:09: Producer set message: 1
00:01:09: Consumer get message: 53
00:01:09: Producer set message: 46
00:01:09: Consumer get message: 1
00:01:09: Producer set message: 61
00:01:09: Consumer get message: 46
00:01:09: Producer set message: 71
00:01:09: Consumer get message: 61
00:01:09: Producer set message: 55
00:01:09: Consumer get message: 71
00:01:09: Producer set message: 75
00:01:09: Consumer get message: 55
00:01:09: Producer set message: 18
00:01:09: Consumer get message: 75
00:01:09: Producer set message: 54
00:01:09: Consumer get message: 18
00:01:09: Producer set message: 2
00:01:09: Consumer get me

## threading.Event

In [22]:
def thread_waiting(event):
    logging.info("Child\t\t:Thread is waiting for event")
    event.wait()
    logging.info("Child\t\t:Thread is executing")


if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    event = threading.Event()
    child = threading.Thread(target=thread_waiting, args=(event,))
    logging.info("Main\t\t:child starting")
    child.start()
    time.sleep(1)
    logging.info("Main\t\t:event set")
    event.set()
    child.join()
    logging.info("Main\t\t:terminating")

00:09:31: Main		:child starting
00:09:31: Child		:Thread is waiting for event
00:09:32: Main		:event set
00:09:32: Child		:Thread is executing
00:09:32: Main		:terminating
