#### Multi threads

https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p01_start_stop_thread.html

多线程

- 当你创建好一个线程对象后，该对象并不会立即执行，除非你调用它的 start() 方法（当你调用 start() 方法时，它会调用你传递进来的函数，并把你传递进来的参数传递给该函数）
- 线程一旦启动，将独立执行直到目标函数返回。你可以查询一个线程对象的状态，看它是否还在执行：
    - t.is_alive()
- 也可以将一个线程加入到当前线程，并等待它终止
    - t.join()
- 对于需要长时间运行的线程或者需要一直运行的后台任务，你应当考虑使用后台线程(Daemon thread)，后台线程无法等待，不过，这些线程会在主线程终止时自动销毁。
- 由于全局解释锁（GIL）的原因，Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型。所以，Python 的线程更适用于处理I/O和其他需要并发执行的阻塞操作（比如等待I/O、等待从数据库获取数据等等），而不是需要多处理器并行的计算密集型任务。

In [1]:
import threading
import time
from queue import Queue

In [2]:
# examples
def print_time( threadName, delay):
    count = 0
    while count < 5:
        time.sleep(delay)
        count += 1
        print ("%s: %s" % ( threadName, time.ctime(time.time()) ))

# Create two threads as follows
try:
    threading.Thread( target=print_time, args=("Thread-1", 2, ) ).start()
    threading.Thread( target=print_time, args=("Thread-2", 4, ) ).start()
except:
    print ("Error: unable to start thread")

#### Key APIs in Threading module

The threading module exposes all the methods of the thread module and provides some additional methods
- threading.activeCount() − Returns the number of thread objects that are active.
- threading.currentThread() − Returns the number of thread objects in the caller's thread control.
- threading.enumerate() − Returns a list of all thread objects that are currently active.

The threading module has the Thread class that implements threading. The methods provided by the Thread class are as follows
- run() − The run() method is the entry point for a thread.
- start() − The start() method starts a thread by calling the run method.
- join([time]) − The join() waits for threads to terminate.
- isAlive() − The isAlive() method checks whether a thread is still executing.
- getName() − The getName() method returns the name of a thread.
- setName() − The setName() method sets the name of a thread.

In [3]:
# example of thread class
class myThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
        
    def run(self):
        print ("Starting " + self.name)
        print_time(self.name, 3, self.counter)
        print ("Exiting " + self.name)

def print_time(threadName, counter, delay):
    while counter:
        time.sleep(delay)
        print ("%s: %s" % (threadName, time.ctime(time.time())))
        counter -= 1

# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# Start new Threads
thread1.start()
thread2.start()

# run new Threads locally
# thread1.run()
# thread2.run()

print ("Exiting Main Thread")

Starting Thread-1
Starting Thread-2
Exiting Main Thread


In [4]:
# example of thread class with lock
class myThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    
    def run(self):
        print ("Starting " + self.name)
        # Get lock to synchronize threads
        threadLock.acquire()
        print_time(self.name, self.counter, 3)
        # Free lock to release next thread
        threadLock.release()
        print ("Exiting " + self.name)

def print_time(threadName, delay, counter):
    while counter:
        time.sleep(delay)
        print ("%s: %s" % (threadName, time.ctime(time.time())))
        counter -= 1

threadLock = threading.Lock()
threads = []

# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# Start new Threads
thread1.start()
thread2.start()

# Add threads to thread list
threads.append(thread1)
threads.append(thread2)

# Wait for all threads to complete
for t in threads:
    t.join()
print ("Exiting Main Thread")

Starting Thread-1
Starting Thread-2
Thread-1: Wed Mar 17 23:16:30 2021
Thread-1: Wed Mar 17 23:16:30 2021
Thread-1: Wed Mar 17 23:16:31 2021
Thread-1: Wed Mar 17 23:16:31 2021Thread-2: Wed Mar 17 23:16:31 2021

Thread-1: Wed Mar 17 23:16:31 2021
Thread-1: Wed Mar 17 23:16:32 2021
Exiting Thread-1
Thread-1: Wed Mar 17 23:16:32 2021
Exiting Thread-1
Thread-2: Wed Mar 17 23:16:33 2021
Thread-1: Wed Mar 17 23:16:33 2021
Thread-2: Wed Mar 17 23:16:33 2021
Thread-2: Wed Mar 17 23:16:34 2021
Thread-1: Wed Mar 17 23:16:35 2021
Thread-2: Wed Mar 17 23:16:35 2021
Exiting Thread-2
Thread-2: Wed Mar 17 23:16:36 2021
Thread-2: Wed Mar 17 23:16:37 2021
Thread-1: Wed Mar 17 23:16:37 2021
Thread-2: Wed Mar 17 23:16:38 2021
Exiting Thread-2
Exiting Main Thread


#### Queue module

- Queue.get() − remove and return an item from the queue.
- Queue.put() − put item into the queue.
- Queue.task_done() − Indicate that a formerly enqueued task is complete.
- Queue.join() − blocks until all items in the queue have been gotten and processed.
  - Queue.task_done is not there for the workers' benefit. It is there to support Queue.join
  - always call task_done() and join() together!

#### Joins
- Thread joins allow to spawn one or more child threads and then have the current thread wait until those threads are finished.
- Unlike thread joins, a queue group waits on all of its child tasks simultaneously.

In [5]:
# example of thread class with lock and Queue
exitFlag = 0
class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
        
    def run(self):
        print ("Starting " + self.name)
        process_data(self.name, self.q)
        print ("Exiting " + self.name)

def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print ("%s processing %s" % (threadName, data))
            q.task_done()
        else:
            queueLock.release() 
        time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue()
threads = []
threadID = 1

# Create new threads
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# Fill the queue
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# Block until all tasks are done
workQueue.join()

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
    t.join()
print ("Exiting Main Thread")

Starting Thread-1
Starting Thread-2
Starting Thread-3
Thread-1: Wed Mar 17 23:16:39 2021
Thread-2 processing OneThread-1 processing TwoThread-3 processing Three


Thread-3 processing FourThread-2 processing Five

Exiting Thread-1
Thread-2: Wed Mar 17 23:16:41 2021
Exiting Thread-3Exiting Thread-2

Exiting Main Thread


#### Daemon thread
The threads which are always going to running in the background that provides supports to main or non-daemon threads, those background executing threads are considered as Daemon Threads. The Daemon Thread does not block the main thread from exiting and continues to run in the background.

daemon属性可以不设置，默认为None，主线程默认是False。
- if daemon is set to True: 父线程在运行完毕后，子线程无论是否正在运行，都会伴随主线程一起退出
- if daemon is set to False: 父线程在运行完毕后，会等待所有子线程退出才结束程序
- if daemon is set to None: the daemonic property is inherited from the current thread

In [6]:
# daemon = True, please run it using a script
# ipython notebook default has 5 threads
# i=0,foo thread daemon is True
# Main thread daemon is False
# Main Thread Exit.
def test():
    def foo():
        for i in range(3):
            print('i={},foo thread daemon is {}'.format(i, threading.current_thread().isDaemon()))
            time.sleep(1)

    t = threading.Thread(target=foo,daemon=True)
    t.start()

    print("Main thread daemon is {}".format(threading.current_thread().isDaemon()))
    print("Main Thread Exit.")

test()

i=0,foo thread daemon is True
Main thread daemon is False
Main Thread Exit.


In [7]:
# daemon = False
def foo():
    for i in range(3):
        print('i={},foo thread daemon is {}'.format(i,threading.current_thread().isDaemon()))
        time.sleep(1)

t = threading.Thread(target=foo,daemon=False)
t.start()
 
print("Main thread daemon is {}".format(threading.current_thread().isDaemon()))
print("Main Thread Exit.")

i=0,foo thread daemon is False
Main thread daemon is False
Main Thread Exit.


#### ThreadPoolExecutor

a module to manage threads automatically. 

https://www.shangyang.me/2018/05/26/python-thread-and-threadpool/ \
https://docs.python.org/3/library/concurrent.futures.html \
- Executor (abstract class, ThreadPoolExecutor, ProcessPoolExecutor)
    - submit(fn, *args, **kwargs): 
    执行fn(*args, **kwargs)并返回一个Future对象; 
    - map(func, *iterables, timeout=None, chunksize=1)：
    与内置的map函数用法相似，区别：1. the iterables are collected immediately rather than lazily; 2. func是异步执行的，多个func可以并发执行. 当使用ProcessPoolExecutor时，设置chunksize的值可以将iterables分块，并一次性发给进程池中的对象，对于很长的迭代对象，使用一个大的chunksize可以提高效率。但是对于ThreadPoolExecutor对象，chunksize没有任何作用。
    - shutdown(wait=True):
    释放资源的，通过给每个thread或process执行join()方法实现。通过使用with语句可以避免使用这个方法。

- executor.submit() 将会向 thread pool 中提交一个待执行的任务，若，当前有空闲的 worker，那么该任务将会被立即执行;该方法会立即返回一个 Future 对象，该对象封装了异步线程的执行等待逻辑，并且，当线程的异步操作执行完成以后，返回该线程执行的结果；
- future.result() 的行为类似于 thread.join() 操作，会一直阻塞主进程直到结果返回为止；所以，如果我们同时有多个 Future 对象，那么像上面这种写法，等待前一个 Future 对象的时候会阻塞后续的 Future 对象。

In [8]:
import concurrent.futures
import urllib.request

In [9]:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 2, 3)
    print(future.result())

8


- use futures.as_completed() to check whether a Future object is executed successfully. 这里，并不会像 Future.result() 那么样阻塞主进程，因为它只是简单的判断一下 Future 是否完成，便立刻返回，直到某个 Future 完成以后才会输出. 在源码里，可以看见只要任何一个线程处于pending中，while循环不会退出。

In [10]:
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

'http://www.foxnews.com/' page is 278075 bytes
'http://europe.wsj.com/' generated an exception: HTTP Error 403: Forbidden
'http://www.cnn.com/' page is 1134324 bytes
'http://some-made-up-domain.com/' page is 64668 bytes
i=1,foo thread daemon is True
i=1,foo thread daemon is False
'http://www.bbc.co.uk/' page is 302333 bytes
i=2,foo thread daemon is True
i=2,foo thread daemon is False
Thread-2: Wed Mar 17 23:16:45 2021
Thread-2: Wed Mar 17 23:16:49 2021


Multiprocessing vs Threading Python \
https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python

- The threading module uses threads, the multiprocessing module uses processes. The difference is that threads run in the same memory space, while processes have separate memory. This makes it a bit harder to share objects between processes with multiprocessing. Since threads use the same memory, precautions have to be taken or two threads will write to the same memory at the same time. 

- Spawning processes is a bit slower than spawning threads.
