# threading

In [1]:
import threading

## 1. Use Thread as a worker

In [2]:
def do_some_work(val):
    print("Doing some work in thread")
    print("echo: {}".format(val))
    return

In [3]:
val1 = "Hello World"
val2 = "Goodbye World"
# Create threads (workers in this senario)
t1 = threading.Thread(target=do_some_work, args=(val1,))
t2 = threading.Thread(target=do_some_work, args=(val2,))
# Start worker
t1.start()
t2.start()
# Prevent execution until both workers finished their jobs
t1.join()
t2.join()

Doing some work in thread
echo: Hello World
Doing some work in thread
echo: Goodbye World


## 2. Use Thread as work unit (class/object)
- Use inheritance

In [4]:
class FibonacciThread(threading.Thread):
    def __init__(self, num):
        super().__init__()
        self.num = num
    
    def run(self):
        fib = [0] * (self.num + 1)
        fib[0] = 0
        fib[1] = 1
        for i in range(2, self.num + 1):
            fib[i] = fib[i - 1] + fib[i - 2]
        print(fib[self.num])

In [5]:
myFibTask1 = FibonacciThread(9)
myFibTask2 = FibonacciThread(12)

myFibTask1.start()
myFibTask2.start()

myFibTask1.join()
myFibTask2.join()

34
144


### 2.0. threading.run() vs threading.start()
- Invoking start() will create a new thread and execute run() in this new thread. 
- Invoking run() yourself will execute it in the current thread itself. Execution of run() will not switch to a different thread. So it will execute its actions on the main thread itself.
- Overwrite run() if you want to modify thread behavior
- Always call start() since it will actually start a new thread

## 3. Thread Synchronization
### 3.0. Scheduler: An operating system module that selects the next job to be admitted into the system and the next process to run
### 3.1. Context Switch: The process of saving and restoring the state of a thread or process
### 3.2. Best practice: Keep access shared resources between threads at minimum
### 3.3. Lock: Mechanism to prevent other threads from using certain resources
- acquire: Lock a lock
- release: Unlock a locked lock

In [6]:
lock = threading.Lock()
# a. acquire & release
try:
    lock.acquire()
    # ... access shared resource
finally:    
    lock.release()
# b. use context manager
with lock:
    # ... access shared resource
    pass

### 3.4. Lock example comparison
#### 3.4.0. Senario A - Lock resource and start threads at the same time (almost)

In [7]:
import time

def add_one(lock, arr, n):
    time.sleep(1)
    with lock:
        for i in range(n):
            arr.append(1)
        return arr    

lock = threading.Lock()
arr = [2,3,4]
t1 = threading.Thread(target=add_one, args=(lock, arr, 1000000))
t2 = threading.Thread(target=add_one, args=(lock, arr, 900000))
t1.start()
t2.start()
t1.join()
print("With lock, 2 threads write arr in sequence: {}".format(len(arr)))
t2.join()

With lock, 2 threads write arr in sequence: 1027740


#### 3.4.1. Senario B - Not Lock resouce, but t2 starts after t1 is finished

In [8]:
def add_one(lock, arr, n):
#     with lock:
        time.sleep(1)
        for i in range(n):
            arr.append(1)
        return arr    

lock = threading.Lock()
arr = [2,3,4]
t1 = threading.Thread(target=add_one, args=(lock, arr, 1000000))
t2 = threading.Thread(target=add_one, args=(lock, arr, 900000))
t1.start()
t1.join()
# Start t2 after t1 finished
t2.start()
print("Without lock, but join t1 before t2 starts, 2 threads write arr in sequence: {}".format(len(arr)))
t2.join()

Without lock, but join t1 before t2 starts, 2 threads write arr in sequence: 1000003


#### 3.4.2. Senario C - Not lock resource and start threads at the same time (almost)

In [9]:
def add_one(lock, arr, n):
#     with lock:
        time.sleep(1)
        for i in range(n):
            arr.append(1)
        return arr    

lock = threading.Lock()
arr = [2,3,4]
t1 = threading.Thread(target=add_one, args=(lock, arr, 1000000))
t2 = threading.Thread(target=add_one, args=(lock, arr, 900000))
t1.start()
t2.start()
t1.join()
print("Without lock, 2 threads write arr at the same time: {}".format(len(arr)))
t2.join()

Without lock, 2 threads write arr at the same time: 1900003


#### 3.4.3. Conclution
- We can tell the length from Senario C is apparently larger than Senario A & B, since 2 threads write to 'arr' at the same time
- **Senario B** is has no lock is used, but thread is running in sequence. No matter how many times you run the cell, the length will always be fixed.
- **Senario A** is using lock, we are seeing length slightly over 'n' in t1, that's because there is a time difference between t1.join() and the print statement right below it.
- **Senario C** has no lock used, and 2 threads starts at the same time. At the time the print() statement run, t2 has already finished all/most of its job (depends on your CPU), becuase of async start of threads and no lock.

### 3.5. Semaphore
- Semaphore is a counter, where it will never go below 0
- acquire: decrease semaphore by 1
- release: increase semaphore by 1

### 3.6. Other mechanism: event, condition

### 3.7. Preferred mechanism - Queue
#### 3.7.0. Producer & Consumer Pattern 
- Producer spawn multiple tasks
- Consumer keep retrieving from queue until tasks are all finished

In [10]:
import time
from queue import Queue
from threading import Thread

# Pretend this is a time consuming task
def make_an_item_available(i):
    time.sleep(0.2)
    return i*i

# Producer: Processing job and add result to queue
def producer(queue):
    for i in range(10):
        item = make_an_item_available(i)
        queue.put(item)

# Consumer: Consume job until manually terminated (send 'None' to queue)
def consumer(queue):
    while True:
        # Will hang until queue is not empty, get(block=False) will terminate hanging 
        item = queue.get()  
        if item is None: break
        arr.append(item)
        queue.task_done()  # mark the item as done

In [11]:
start = time.perf_counter()        
queue, arr = Queue(), []
t1 = Thread(target=producer, args=(queue,))
t2 = Thread(target=consumer, args=(queue,))
t1.start()
t2.start()
t1.join()  # Wait all task stored in queue
queue.put(None)  # Stop consumer
t2.join()  # Prevent execution until t2 finished
print("Time consumed {}".format(time.perf_counter() - start))
print(arr)

Time consumed 2.0134524710010737
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


#### 3.7.1. Producer & Consumer Pattern (modified example)
- Producer spawn & start multiple threads
- Use 1 thread each on producer and consumer 
- join() producer thread so that consumer thread will process all result from producer tasks
- Consumer process results after each task finished

In [12]:
import time
from threading import Thread
from queue import Queue

# Pretend this is a time consuming task
def make_an_item_available(arr, i):
    time.sleep(0.2)
    arr.append(i*i)

# Producer create multiple threads
def producer(queue):
    for i in range(10):
        t = Thread(target=make_an_item_available, args=(arr, i, ))
        t.start()
        queue.put(t)

def consumer(queue):
    while True:
        t = queue.get()  # Will hang until queue is not empty, get(block=False) will terminate hanging 
        if t is None: break
        thread_list.append(t.getName())  # do something with thread
        queue.task_done()  # mark the item as done

In [13]:
start = time.perf_counter()        
queue, thread_list = Queue(), []
arr = []
t1 = Thread(target=producer, args=(queue,))
t2 = Thread(target=consumer, args=(queue,))
t1.start()
t2.start()
# Wait all threads finished 
t1.join()
# Stop consumer
queue.put(None)
t2.join()
time.sleep(0.2)
print("Time consumed {}".format(time.perf_counter() - start))
print(arr)
print(thread_list)

Time consumed 0.20948488900103257
[0, 36, 25, 1, 16, 9, 4, 49, 64]
['Thread-18', 'Thread-19', 'Thread-20', 'Thread-21', 'Thread-22', 'Thread-23', 'Thread-24', 'Thread-25', 'Thread-26', 'Thread-27']


#### 3.7.2. Best Practice - Fixed number of threads is preferred otherwise it's dangerous to spawn too many threads
#### 3.7.3. Simple multithreading on some work

In [14]:
def work(num, i):
    time.sleep(1)  # Time consuming job here
    num.append(i)
    
def spawn_threads(num):
    thread_list = []
    for i in range(thread_count):
        t = Thread(target=work, args=(num, i, ))
        t.start()
        thread_list.append(t)
    for t in thread_list:
        t.join()

In [15]:
start = time.perf_counter()
num = []
thread_count = 5
spawn_threads(num)
print("Result: {}".format(num))
print("Time spent: {}".format(time.perf_counter() - start))

Result: [1, 0, 2, 3, 4]
Time spent: 1.0076971920007054
