<div style="color:red;background-color:black">
Diamond Light Source
<br style="color:red;background-color:antiquewhite"><h1>Python Language: Threading</h1>  

©2000-23 Chris Seddon 
</div>

## What is a Thread

A thread is a separate flow of execution through your code. This means that different threads may be running the 
same code at different times, but they could be executing entirely different code.

Threads are used when you want to run multiple sections of code concurrently.  In most programming languages, 
threads can be run on different CPUs to achieve true concurrency, but often time slicing on the same CPU is used 
to create apparent concurrency.  When multiple CPUs are used, threading can greatly speed up code.

Most Python programs are run using CPython which is the default implementation of Python.  CPython was originally
written when single threaded programs were the norm and it was felt that making CPython thread safe (see later
for what that really means) would slow down existing programs.  To avoid contentions between threads, CPython creates a Global Interface Lock (GIL) 
each time a thread is run, effectively serialising threaded code.  Although this avoids errors it also creates
a performance bottleneck; we will look at the GIL in subsequent examples.  Many attempts have been made to make 
CPython thread safe, but political arguments have so far stopped this from happening.

In practice, threading is still useful for concurrent tasks, but your code won't necessarily run faster.  IO-bound 
tasks spend a lot of time waiting (idle) for data to be ready.  For these tasks there is a real speed benefit, by 
switching to running code in another thread, when the the current thread becomes I/O bound.  However, for CPU-bound
tasks, switching threads won't speed things up because no threads are in an idle state.

CPython has always been single threaded and it is highly unlikely that this will ever change.  PyPy is the other 
popular implementation of Python, but that too has a GIL.  The good news is that code that uses libraries written in
C/C++ such as Numpy don't use the GIL and can use threading for CPU-bound tasks.

If you want to run concurrent CPU-bound Python code, you should check out the multiprocessing module instead.
This gets around the problem of using the GIL by creating separate python interpreters, one for each process.

## 1 Creating Threads
Recall that threads are used to perform concurrent tasks.  Threads are ultimately created by the operating system (kernel), but as far as we are concerned we make a Python call to start a thread; the Python interpreter then contacts the kernel.

Python provides a helper class to manage threads.  Rather confusingly, this class is called "Thread".  Objects of this class are NOT threads, just helper objects!

All programs start with a single thread (often called the main thread).  When the main thread wants to create further threads, it creates objects of the helper class and calls their "start" method:<pre>thread1.start()
thread2.start()
thread3.start()</pre>

Realize that when the new threads start, they need to perform a different task (or function) from the main thread.  This task is specified as a parameter when the main thread creates the helper objects:<pre>thread1 = Thread(target=myfunc, args=("1",))
thread2 = Thread(target=myfunc, args=("2",))
thread3 = Thread(target=myfunc, args=("3",))</pre>
Creating the helper objects DOES NOT create any threads - calling the start method creates and starts a thread.

After the "start" method has been called, execution of the main thread and the other threads continues in parallel.  Because the operating system may suspend threads at any time, it is not possible to predict which order code will execute unless we use special synchronization objects.

In this example the main thread creates 3 other threads which all execute the "myfunc" function.  Each of these threads terminate when they exit this function.  I've added some random timings to emphasize the parallel nature of this program:

In [None]:
import random
import time
import sys

from threading import Thread

def myfunc(name):
    for i in range (1, 50):
        sys.stdout.write(name)        
        time.sleep(random.random() * 0.1)      

# define a callback function - to be called via start()
thread1 = Thread(target=myfunc, args=("1",))
thread2 = Thread(target=myfunc, args=("2",))
thread3 = Thread(target=myfunc, args=("3",))

thread1.start()
thread2.start()
thread3.start()

print("\nEnd of main Thread") 

## 2 Joining Threads
Note that the main thread is counted a just another thread (it is not special).  However, often programs are designed such that the main thread is the last to complete.  To achieve this, the main thread can wait for the other threads to complete before proceding:<pre>thread1.join()
thread2.join()
thread3.join()</pre>

In [None]:
import random
import time
import sys

from threading import Thread

def myfunc(name):
    for i in range (1, 50):
        sys.stdout.write(name)        
        time.sleep(random.random() * 0.1)      

# define a callback function - to be called via start()
thread1 = Thread(target=myfunc, args=("1",))
thread2 = Thread(target=myfunc, args=("2",))
thread3 = Thread(target=myfunc, args=("3",))

thread1.start()
thread2.start()
thread3.start()

thread1.join()
thread2.join()
thread3.join()

print("\nEnd of main Thread") 

## 3 Using Methods as Callbacks
As an alternative to specifying the target function for a thread, we can make the callback function a method in a 
class; this ultimately depends on operator overloading.  In the previous example the callback function was "myfunc" 
and after the "start" method is called, Python calls back on this function.

We can equally specify an object as a callback.  Python will try to call this object; i.e. call the overloaded
() for the class.  Thus if the target is the object "m1" 
<pre>            t1 = Thread(target = m1, args = ("1",))
</pre>
the callback will be on:
<pre>            m1()
</pre>            
and because of operator overloading, this is equivalent to calling the dunder method:
<pre>            m1.__call__()
</pre>
So you can use an object as the target provided it's class has a "__call__()" method.


In [None]:
from threading import Thread
import random
import time
import sys


# create a callable class
class MyClass:
    def __init__(self):
        pass
    
    def __call__(self, name):
        for i in range (1, 50):
            sys.stdout.write(name)        
            time.sleep(random.random() * 0.1)    

    
m1 = MyClass()
m2 = MyClass()
m3 = MyClass()

# define a callback class - __call__() to be called via start()
t1 = Thread(target = m1, args = ("1",))
t2 = Thread(target = m2, args = ("2",))
t3 = Thread(target = m3, args = ("3",))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

print("\nEnd of main")

## 4 Locks
To control parallel threads we can use synchronization classes.  The most import is the "Lock" class.  A "Lock" 
object will allow only one thread at a time execute the code guarded by the lock.  A thread acquires a lock with:
<pre>           lock.acquire()</pre>

and releases a lock with 
<pre>           lock.release()</pre>

The code between these calls is guarded.  Such locks are often called monitor locks; they monitor code and only 
allow one thread at a time execute code between the "acquire" and "release" calls.

In this example, 4 threads execute code in the "\_\_call\_\_" method, but the monitor lock (`lock3`) serializes execution.  

If you try using two or more locks you will find threads sharing the same lock do not execute concurrently.


In [None]:
from threading import Thread, Lock
import random
import time
import sys


# task for threads
def task(name, lock):
    lock.acquire()        
    for i in range (1, 50):
        sys.stdout.write(name)
        time.sleep(random.random() * 0.1)
    lock.release()    

    
lock = Lock()

t1 = Thread(target = task, args = ("1", lock))
t2 = Thread(target = task, args = ("2", lock))
t3 = Thread(target = task, args = ("3", lock))
t4 = Thread(target = task, args = ("4", lock))

# create 4 threads
t1.start()
t2.start()
t3.start()
t4.start()

## 5 Locks
If we modify the example slightly and create 2 locks, one lock shared by threads 1 and 3 and the other by threads 2 and 4 then the locks will prevent 1 and 3 running simultaneously and similarly with 2 and 4:

In [None]:
from threading import Thread, Lock
import random
import time
import sys


# task for threads
def task(name, lock):
    lock.acquire()        
    for i in range (1, 50):
        sys.stdout.write(name)
        time.sleep(random.random() * 0.1)
    lock.release()    

    
lockA = Lock()
lockB = Lock()

t1 = Thread(target = task, args = ("1", lockA))
t2 = Thread(target = task, args = ("2", lockB))
t3 = Thread(target = task, args = ("3", lockA))
t4 = Thread(target = task, args = ("4", lockB))

# create 4 threads
t1.start()
t2.start()
t3.start()
t4.start()

## 6 Sharing Data
As discussed previously, code using += is not thread safe.  As an illustration of this we define two counts and
then increment these counts in 3 separate threads.  One of the counts is protected by a lock, but the other is not.
We have to increment many (10 million) times to maximise the chance of being suspended in the critical section of 
code.  The protected count will always end up at 3 threads x 10 million = 30,000,000, but the other count will
usually end up less as a result of the contention between the threads.

Because programmers are prone to forget to release locks, we give alternate ways of using a lock in threads "B"
and "C".  Thread "B" uses a finally block and thread "C" uses a with statement.  The with statement is expanded
by the interpreter to the try-finally form (so these forms are equivalent).

Note that often code is modified after the initial design and we might introduce code that could throw an exception
between obtaining the lock and releasing it.  That's why it's better to use the with statement than the straight
lock.release() because "with" is exception safe.


In [None]:
from threading import Thread
from threading import Lock

# 3 threads increment 2 Counters ...
# count1 is unprotected
# count2 is protected

N = 10*1000*1000

class M:
    lock = Lock()
    count1 = 0
    count2 = 0

    def __call__(self, name):        
        if name == "A":
            for i in range(0, N):
                M.count1 += 1
            M.lock.acquire()
            for i in range(0, N):
                M.count2 += 1
            M.lock.release()
        if name == "B":
            for i in range(0, N):
                M.count1 += 1
            M.lock.acquire()
            try:
                for i in range(0, N):
                    M.count2 += 1
            finally:
                M.lock.release()
        if name == "C":
            M.count1 += 1
            with M.lock:
                for i in range(0, N):
                    M.count2 += 1

m1 = M()
m2 = M()
m3 = M()

t1 = Thread(target = m1, args = ("A",))
t2 = Thread(target = m2, args = ("B",))
t3 = Thread(target = m3, args = ("C",))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

print("")
print(f"M.count1: {M.count1}")
print(f"M.count2: {M.count2}")

## 7 Data Corruption
The data corruption occurs because:<pre>
    count1 += 1</pre>
is not an atomic operation.  We can see this by examining the byte code using the disassembler module:

In [None]:
import dis
dis.dis("x += 1")

## 8 Atomic Instructions
What can happen is that the thread can get suspended by the kernel just after the INPLACE_ADD instruction.  

Suppose "x" is some value, say 700.  Inside the interpreter, the INPLACE_ADD will add 1 to 700 and store 701 in a machine register.  If the thread then gets suspended, this register will be cached by the kernel.  

Other threads will now increment "x" many times.  Let's say "x" ends up with the value 3287 for sake of argument.

Eventually, the original thread will be restarted.  Its registers will be reinstated by the kernel, so it can continue where it left off.  The thread was just about to execute the STORE_NAME instruction; when it does execute the instruction it uses the value 701 from the reinstated register.  This overwrites 3287 with 701, thereby corrupting the count.  That's what happened above.

Conclusion: all non-atomic operations need protecting by locks.

But how do we know if an operation is atomic?  Take a look at the following:

In [None]:
import dis
dis.dis("[2,5,3,6].sort()")        

## 9 Global Interpreter Lock (GIL)
In the above, the "sort" method is executed as a single byte code instruction CALL_METHOD.  This is what makes it atomic.  The Python interpreter cannot suspend a thread part way through a byte code instruction.

The GIL is a lock held by CPython interpreter whenever bytecode is being executed unless it is explicitly released 
by the programmer.  CPython assumes that whatever occurs between bytecodes is not thread-safe unless told otherwise.
This implies that the GIL is enabled by default.  CPython releases the GIL periodically to allow other threads to
run.

Other implementation of the Python interpreter such as Jython and IronPython have no GIL and can fully exploit 
multithreading.  PyPy currently has a GIL (similar to CPython) and Cython also has a GIL.  Note, the GIL in Cython 
can be released temporarily using a "with" statement. 

In CPython, the GIL is released every few (<10) msec after completing a byte code instruction.  Operations 
consisting of a single byte code instruction are atomic and hence thread safe.

A thread may release the GIL voluntarily to allow another thread to run.  A thread only needs to hold the GIL 
while it works with Python objects, so CPython will release the GIL and allow another thread to run if the thread
holding the GIL performs I/O operations or other blocking calls into the OS like select() and pthread_mutex_lock().
The GIL may also be released by library code written in C/C++.  Note that there is no direct way of releasing the 
GIL in Python code (but there is if you are writing a C/C++ extension).

The instruction:
<pre>           count1 += 1</pre>
was several byte code instructions and therefore was non-atomic.

So operations consisting of a single byte code are thread safe.

## 10 Condition Variables

Apart from locks, there are some other synchronization primatives to consider.  Let's now look at the producer/consumer code below.  The problem we have here is that the producer will create data for each of the consumers, but it might take some time to do so.  It is important that the consumers don't attempt to use the data before it is available.

We can use a "condition" variable to synchronize the threads:
<pre>           dataAvailable = threading.Condition()</pre>
The consumers all wait on the "condition" variable:
<pre>           dataAvailable.wait()</pre>
until the producer is ready to provide the data.  The producer notifies all the consumers that they can proceed with:
<pre>           dataAvailable.notifyAll()</pre>

In [None]:
import threading
from threading import Thread
import random
import time
import sys

class Producer:
    def __call__(self, dataAvailable):
        print("Producer is obtaining data")
        time.sleep(5)
        with dataAvailable:         # grab the lock
            print("Producer is notifying all consumers")
            dataAvailable.notifyAll()

class Consumer:
    def __call__(self, name, dataAvailable):
        with dataAvailable:
            print(f"consumer{name} is waiting")
            dataAvailable.wait()
            print(f"consumer{name} is has obtained the data")

    
dataAvailable = threading.Condition()

producer = Producer()
consumer1 = Consumer()
consumer2 = Consumer()
consumer3 = Consumer()

# give each thread a lock
t = Thread(target = producer, args = (dataAvailable,))
t1 = Thread(target = consumer1, args = ("1", dataAvailable))
t2 = Thread(target = consumer2, args = ("2", dataAvailable))
t3 = Thread(target = consumer3, args = ("3", dataAvailable))

t.start()
t1.start()
t2.start()
t3.start()

t.join()
t1.join()
t2.join()
t3.join()

print("\nEnd of main")

## 10 Events
Event objects are very similar to condition variables.  

The event object is created by:
<pre>           event = Event()</pre>
and any thread can wait on the event:
<pre>           event.wait()</pre>
All waiting threads are released when any thread "sets" the event:
<pre>           event.set()</pre>

In [None]:
from threading import Thread
from threading import Event
import random
import time
import sys


class MyClass:
    def __call__(self, name):
        global event
        print(f"{name} waiting for event");
        event.wait()
        print(f"\t{name} proceeding after event");


event = Event()

m1 = MyClass()
m2 = MyClass()
m3 = MyClass()

t1 = Thread(target = m1, args = ("1",))
t2 = Thread(target = m2, args = ("2",))
t3 = Thread(target = m3, args = ("3",))

t1.start()
t2.start()
t3.start()

print("... main waiting for 15 seconds")
time.sleep(15)
print("... main clearing event flag")
event.set()

t1.join()
t2.join()
t3.join()

print("\nEnd of main")

## 11 Bounded Semaphores
Bounded semaphores are like a set of multiple locks.  A bounded semaphore is created with an initial count:
<pre>           semaphore = BoundedSemaphore(3)</pre>

Threads can acquire the semaphore by decrementing the count:
<pre>           semaphore.acquire()</pre>

However the count can never go negative.  So after 3 threads have acquired the semaphore the next thread will be blocked until another thread releases the semaphore and increments the count by one:
<pre>           semaphore.release()</pre>

This continues until all the threads have acquired and released the semaphore.  Thus this bounded semaphore behaves as a set of 3 locks.

In [None]:
from threading import Thread
from threading import BoundedSemaphore
import random
import time
import sys


class MyClass:
    def __call__(self, name):
        global semaphore
        semaphore.acquire()
        print((name + " claimed semaphore"));
        time.sleep(5)
        print(("\t" + name + " released semaphore"));
        semaphore.release()



semaphore = BoundedSemaphore(3)

m1 = MyClass()
m2 = MyClass()
m3 = MyClass()
m4 = MyClass()
m5 = MyClass()
m6 = MyClass()
m7 = MyClass()

t1 = Thread(target = m1, args = ("1",))
t2 = Thread(target = m2, args = ("2",))
t3 = Thread(target = m3, args = ("3",))
t4 = Thread(target = m4, args = ("4",))
t5 = Thread(target = m5, args = ("5",))
t6 = Thread(target = m6, args = ("6",))
t7 = Thread(target = m7, args = ("7",))

t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
t6.start()
t7.start()

t1.join()
t2.join()
t3.join()
t4.join()
t5.join()
t6.join()
t7.join()

print("\nEnd of main")

## 12 Barriers
Barriers are yet another synchronization object.  A barrier is created with a count and a timeout:
<pre>           b = Barrier(5, timeout=10)</pre>

In this example a server and 4 clients synchronize by waiting on this barrier in their respective threads:
<pre>           b.wait()</pre>

When all five threads are waiting, the barrier is satisfied and the Python interpreter removes the barrier and all 5 threads continue.

In [None]:
from threading import Thread, Barrier
import time


# In this example a server a 4 clients synchronize by waiting on a barrier
# in their respective threads.  When all five threads are waiting, 
# the barrier is removed and all 5 threads continue.

b = Barrier(5, timeout=10)

class Server:
    def __init__(self):
        print("server initializing ...")
        self.thread = Thread(target=self)
        self.thread.start()

    def __call__(self):
        time.sleep(5)
        b.wait()
        print("server ready to accept connections")
        
    def connect(self, client):
        print(f"{client.name} has connected")
        
class Client:
    def __init__(self, name, server):
        self.name = name
        self.server = server
        print(f"{self.name} waiting to connect")
        self.thread = Thread(target=self)
        self.thread.start()
    
    def __call__(self):
        b.wait()
        self.server.connect(self)

def main():
    server = Server()
    client1 = Client("client1", server)
    client2 = Client("client2", server)
    client3 = Client("client3", server)
    client4 = Client("client4", server)
    
    server.thread.join()
    client1.thread.join()
    client2.thread.join()
    client3.thread.join()
    client4.thread.join()
    
    print("end of program")
    
main()

## 13 Timers
I should mention the simple Timer:

In [None]:
from threading import Timer

def hello():
    print("hello, world")

t = Timer(15.0, hello)
t.start() # after 15 seconds, "hello, world" will be printed

## 14 Benchmarking
Finally, as mentioned in the introduction to this tutorial, with multi threaded CPU-bound programs, the threads are executed sequentially because of the GIL.  Performance then becomes an issue.

It is recommended to use the multiprocessing module to speed things up in such situations.  We don't use threads in this case, but execute code in separate processes such that the GIL is irrelevant.

It will be interesting to compare a multthreaded program with a mutiprocessing program for timings.  Both programs calculate the value of  

$$\sum i^{0.3}$$  
where i ranges from 0 to 50,000,000.  We can see the performance of both with varying numbers of threads and processes (don't worry to much about the code details):

Conclusion: threads do not speed up cpu bound calculations; use multiple processes.

In [None]:
import time, os
from threading import Thread
from multiprocessing import Process, Pool
import numpy as np
from itertools import chain

''' Calculate the sum of i**0.3 where i ranges from 0 to M
    Use multiple threads or processes to perform the calculation
    Split the calculation into ranges using the intervals function below
'''

M = 50*1000*1000

def calculate(lo, hi):
    '''the calculation to perform'''
    sum = 0
    for i in range (lo, hi):
        sum += float(i)**0.3
    return sum   

def intervals(duration, parts):
    '''splits an interval into several(part) ranges'''
    part_duration = int(duration / parts)
    return [(int(i * part_duration), int((i + 1) * part_duration)) for i in range(parts)]

# calculate the sum using multiple threads
def jobUsingThreads(threadCount):
    threadList = []
    it = intervals(M, threadCount)
    
    for i in range(threadCount):
        t = Thread(target = calculate, args = it[i])
        t.start()
        threadList.append(t)
        
    for t in threadList:
        t.join()

# calculate the sum using multiple threads
def jobUsingProcesses(processCount):
    p = Pool(processes=processCount)
    it = intervals(M, processCount)
    result = p.starmap(calculate, it)

# run job with varying number of processes
for N in chain(range(1, 11), range(20, 101, 20)):
    start = time.perf_counter()
    jobUsingProcesses(N)
    finish = time.perf_counter()
    print(f"{N:2} processes:{finish-start:6.2f}")

# run job with varying number of processes
for N in chain(range(1, 11), range(20, 101, 20)):
    start = time.perf_counter()
    jobUsingThreads(N)
    finish = time.perf_counter()
    print(f"{N:2} threads:{finish-start:6.2f}")